diff --git a/Cargo.lock b/Cargo.lock index f9fdd46..a4d7f67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,17 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -93,6 +104,24 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -111,6 +140,15 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.2.57" @@ -150,6 +188,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.8.1" @@ -221,12 +269,51 @@ dependencies = [ "windows", ] +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + [[package]] name = "dasp_sample" version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c87e182de0887fd5361989c677c4e8f5000cd9491d6d563161a8f3a5519fc7f" +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", +] + [[package]] name = "dirs" version = "5.0.1" @@ -372,6 +459,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.17" @@ -411,6 +508,30 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "1.4.0" @@ -623,6 +744,16 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -1054,14 +1185,21 @@ dependencies = [ name = "qobuz-backend" version = "0.1.0" dependencies = [ + "aes", "anyhow", + "base64", + "cbc", "cpal", + "ctr", "dirs", + "hex", + "hkdf", "md5", "rb", "reqwest", "serde", "serde_json", + "sha2", "symphonia", "thiserror 2.0.18", "tokio", @@ -1410,6 +1548,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1943,6 +2092,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + [[package]] name = "unicode-ident" version = "1.0.24" @@ -1973,6 +2128,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "walkdir" version = "2.5.0" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index e3f7b9d..0352d2f 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -28,6 +28,15 @@ md5 = "0.7" dirs = "5" toml = "0.8" +# Crypto — qbz-1 key derivation + segment decryption +aes = "0.8" +cbc = { version = "0.1", features = ["alloc"] } +ctr = "0.9" +hkdf = "0.12" +sha2 = "0.10" +hex = "0.4" +base64 = "0.22" + [profile.release] lto = "thin" opt-level = 3 diff --git a/rust/include/qobuz_backend.h b/rust/include/qobuz_backend.h index c37caaf..ea6895a 100644 --- a/rust/include/qobuz_backend.h +++ b/rust/include/qobuz_backend.h @@ -75,17 +75,13 @@ uint64_t qobuz_backend_get_duration(const QobuzBackendOpaque *backend); uint8_t qobuz_backend_get_volume(const QobuzBackendOpaque *backend); int qobuz_backend_get_state(const QobuzBackendOpaque *backend); int qobuz_backend_take_track_finished(QobuzBackendOpaque *backend); +int qobuz_backend_take_track_transitioned(QobuzBackendOpaque *backend); // ReplayGain / Gapless void qobuz_backend_set_replaygain(QobuzBackendOpaque *backend, bool enabled); void qobuz_backend_set_gapless(QobuzBackendOpaque *backend, bool enabled); void qobuz_backend_prefetch_track(QobuzBackendOpaque *backend, int64_t track_id, int32_t format_id); -// Visualizer PCM access -uint32_t qobuz_backend_viz_read(QobuzBackendOpaque *backend, float *buf, uint32_t max_samples); -uint32_t qobuz_backend_viz_sample_rate(const QobuzBackendOpaque *backend); -uint32_t qobuz_backend_viz_channels(const QobuzBackendOpaque *backend); - // Artist releases (auto-paginates to fetch all) void qobuz_backend_get_artist_releases(QobuzBackendOpaque *backend, int64_t artist_id, const char *release_type, uint32_t limit, uint32_t offset); diff --git a/rust/src/api/client.rs b/rust/src/api/client.rs index fdf42ca..9c6429e 100644 --- a/rust/src/api/client.rs +++ b/rust/src/api/client.rs @@ -1,6 +1,10 @@ use anyhow::{bail, Result}; +use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; +use cbc::cipher::{block_padding::NoPadding, BlockDecryptMut, KeyIvInit}; +use hkdf::Hkdf; use reqwest::Client; use serde_json::Value; +use sha2::Sha256; use std::time::{SystemTime, UNIX_EPOCH}; use super::models::*; @@ -15,10 +19,66 @@ pub const DEFAULT_APP_SECRET: &str = "e79f8b9be485692b0e5f9dd895826368"; pub struct QobuzClient { http: Client, pub auth_token: Option, + /// Playback session ID from POST /session/start — sent as X-Session-Id for /file/url. + session_id: Option, + session_expires_at: Option, + /// `infos` field from the session/start response — used to derive the KEK for track key unwrapping. + session_infos: Option, app_id: String, app_secret: String, } +// ── qbz-1 key derivation helpers ───────────────────────────────────────────── + +type Aes128CbcDec = cbc::Decryptor; + +/// Decode a base64url string (with or without padding). +fn b64url_decode(s: &str) -> Result> { + Ok(URL_SAFE_NO_PAD.decode(s.trim_end_matches('='))?) +} + +/// Full qbz-1 key derivation: +/// Phase 1: HKDF-SHA256(ikm=hex(app_secret), salt=b64url(infos[0]), info=b64url(infos[1])) → 16-byte KEK +/// Phase 2: AES-128-CBC/NoPadding(key=KEK, iv=b64url(key_field[2])).decrypt(b64url(key_field[1]))[..16] +fn derive_track_key(session_infos: &str, app_secret_hex: &str, key_field: &str) -> Result<[u8; 16]> { + // Phase 1: HKDF + let infos_parts: Vec<&str> = session_infos.splitn(2, '.').collect(); + if infos_parts.len() != 2 { + bail!("session_infos must be '.', got: {session_infos}"); + } + let salt = b64url_decode(infos_parts[0])?; + let info = b64url_decode(infos_parts[1])?; + let ikm = hex::decode(app_secret_hex)?; + + let hk = Hkdf::::new(Some(&salt), &ikm); + let mut kek = [0u8; 16]; + hk.expand(&info, &mut kek) + .map_err(|e| anyhow::anyhow!("HKDF expand failed: {e:?}"))?; + + // Phase 2: AES-128-CBC/NoPadding + let key_parts: Vec<&str> = key_field.splitn(3, '.').collect(); + if key_parts.len() != 3 || key_parts[0] != "qbz-1" { + bail!("unexpected key field format: {key_field}"); + } + let ct = b64url_decode(key_parts[1])?; + let iv_bytes = b64url_decode(key_parts[2])?; + if ct.len() < 16 || iv_bytes.len() < 16 { + bail!("key field ciphertext/iv too short ({} / {} bytes)", ct.len(), iv_bytes.len()); + } + + let iv: [u8; 16] = iv_bytes[..16].try_into()?; + let mut buf = ct; + let decrypted = Aes128CbcDec::new(&kek.into(), &iv.into()) + .decrypt_padded_mut::(&mut buf) + .map_err(|e| anyhow::anyhow!("AES-CBC decrypt failed: {e:?}"))?; + + let mut track_key = [0u8; 16]; + track_key.copy_from_slice(&decrypted[..16]); + Ok(track_key) +} + +// ───────────────────────────────────────────────────────────────────────────── + impl QobuzClient { pub fn new() -> Result { Self::new_with_config(None, None) @@ -44,6 +104,9 @@ impl QobuzClient { Ok(Self { http, auth_token: None, + session_id: None, + session_expires_at: None, + session_infos: None, app_id, app_secret, }) @@ -106,20 +169,16 @@ impl QobuzClient { if let Some(token) = &self.auth_token { builder = builder.header("Authorization", format!("Bearer {}", token)); } + if let Some(sid) = &self.session_id { + builder = builder.header("X-Session-Id", sid.as_str()); + } builder } // --- Auth --- - pub async fn login(&mut self, email: &str, password: &str) -> Result { - match self.oauth2_login(email, password).await { - Ok(r) => Ok(r), - Err(_) => self.legacy_login(email, password).await, - } - } - /// NOTE: Qobuz API requires credentials as GET query params — not our choice. - async fn oauth2_login(&mut self, email: &str, password: &str) -> Result { + pub async fn login(&mut self, email: &str, password: &str) -> Result { let ts = Self::ts(); let mut sign_params: Vec<(&str, String)> = vec![ ("password", password.to_string()), @@ -144,44 +203,14 @@ impl QobuzClient { let body: Value = resp.json().await?; if !status.is_success() { let msg = body.get("message").and_then(|m| m.as_str()).unwrap_or("login failed"); - bail!("oauth2 login failed ({}): {}", status, msg); - } - - self.extract_and_store_token(serde_json::from_value(body)?) - } - - async fn legacy_login(&mut self, email: &str, password: &str) -> Result { - let ts = Self::ts(); - let mut sign_params: Vec<(&str, String)> = vec![ - ("email", email.to_string()), - ("password", password.to_string()), - ]; - let sig = self.request_sig("userlogin", &mut sign_params, ts); - - let resp = self - .http - .get(self.url("user/login")) - .query(&[ - ("app_id", self.app_id.as_str()), - ("email", email), - ("password", password), - ("request_ts", ts.to_string().as_str()), - ("request_sig", sig.as_str()), - ]) - .send() - .await?; - - let status = resp.status(); - let body: Value = resp.json().await?; - if !status.is_success() { - let msg = body.get("message").and_then(|m| m.as_str()).unwrap_or("login failed"); - bail!("user login failed ({}): {}", status, msg); + bail!("login failed ({}): {}", status, msg); } self.extract_and_store_token(serde_json::from_value(body)?) } fn extract_and_store_token(&mut self, login: OAuthLoginResponse) -> Result { + // auth_token = OAuth2 bearer (preferred) or legacy session token if let Some(token) = login .oauth2 .as_ref() @@ -190,9 +219,57 @@ impl QobuzClient { { self.auth_token = Some(token); } + // Reset any cached playback session — it belongs to a different auth context. + self.session_id = None; + self.session_expires_at = None; + self.session_infos = None; Ok(login) } + /// Start a playback session via POST /session/start. + /// The returned session_id is required as X-Session-Id when calling /file/url. + /// Sessions expire; we cache and reuse until 60s before expiry. + async fn ensure_session(&mut self) -> Result<()> { + let now = Self::ts(); + if let (Some(_), Some(exp)) = (&self.session_id, self.session_expires_at) { + if now + 60 < exp { + return Ok(()); // still valid + } + } + + let ts = Self::ts(); + let mut sign_params: Vec<(&str, String)> = vec![ + ("profile", "qbz-1".to_string()), + ]; + let sig = self.request_sig("sessionstart", &mut sign_params, ts); + + let resp = self + .http + .post(self.url("session/start")) + .query(&[ + ("app_id", self.app_id.as_str()), + ("request_ts", ts.to_string().as_str()), + ("request_sig", sig.as_str()), + ]) + .header("Authorization", format!("Bearer {}", self.auth_token.as_deref().unwrap_or(""))) + .form(&[("profile", "qbz-1")]) + .send() + .await?; + + let body = Self::check_response(resp).await?; + let session_id = body["session_id"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("session/start: no session_id in response"))? + .to_string(); + let expires_at = body["expires_at"].as_u64().unwrap_or(now + 3600); + let infos = body["infos"].as_str().map(|s| s.to_string()); + eprintln!("[session] started session_id={}... expires_at={} infos={:?}", &session_id[..session_id.len().min(8)], expires_at, infos); + self.session_id = Some(session_id); + self.session_expires_at = Some(expires_at); + self.session_infos = infos; + Ok(()) + } + // --- User --- pub async fn get_user(&self) -> Result { @@ -215,7 +292,9 @@ impl QobuzClient { Ok(serde_json::from_value(body)?) } - pub async fn get_track_url(&self, track_id: i64, format: Format) -> Result { + pub async fn get_track_url(&mut self, track_id: i64, format: Format) -> Result { + self.ensure_session().await?; + let ts = Self::ts(); let intent = "stream"; let mut sign_params: Vec<(&str, String)> = vec![ @@ -223,10 +302,10 @@ impl QobuzClient { ("intent", intent.to_string()), ("track_id", track_id.to_string()), ]; - let sig = self.request_sig("trackgetFileUrl", &mut sign_params, ts); + let sig = self.request_sig("fileurl", &mut sign_params, ts); let resp = self - .get_request("track/getFileUrl") + .get_request("file/url") .query(&[ ("track_id", track_id.to_string()), ("format_id", format.id().to_string()), @@ -238,7 +317,24 @@ impl QobuzClient { .await?; let body = Self::check_response(resp).await?; - Ok(serde_json::from_value(body)?) + eprintln!("[file/url] response: {}", serde_json::to_string(&body).unwrap_or_default()); + let mut url_dto: TrackFileUrlDto = serde_json::from_value(body)?; + + // Unwrap the per-track key: decrypt the CBC-wrapped key using HKDF-derived KEK. + if let (Some(key_field), Some(infos)) = (url_dto.key.clone(), self.session_infos.as_deref()) { + match derive_track_key(infos, &self.app_secret, &key_field) { + Ok(track_key) => { + url_dto.key = Some(hex::encode(track_key)); + eprintln!("[key] track key unwrapped OK"); + } + Err(e) => { + eprintln!("[key] track key unwrap failed: {e}"); + url_dto.key = None; // disable decryption rather than play garbage + } + } + } + + Ok(url_dto) } // --- Album --- @@ -289,17 +385,48 @@ impl QobuzClient { // --- Search --- pub async fn search(&self, query: &str, offset: u32, limit: u32) -> Result { + let (tracks, albums, artists) = tokio::try_join!( + self.search_tracks(query, offset, limit), + self.search_albums(query, offset, limit), + self.search_artists(query, offset, limit), + )?; + Ok(SearchCatalogDto { + query: Some(query.to_string()), + albums: Some(albums), + tracks: Some(tracks), + artists: Some(artists), + playlists: None, + }) + } + + async fn search_tracks(&self, query: &str, offset: u32, limit: u32) -> Result> { let resp = self - .get_request("catalog/search") - .query(&[ - ("query", query), - ("offset", &offset.to_string()), - ("limit", &limit.to_string()), - ]) + .get_request("track/search") + .query(&[("query", query), ("offset", &offset.to_string()), ("limit", &limit.to_string())]) .send() .await?; let body = Self::check_response(resp).await?; - Ok(serde_json::from_value(body)?) + Ok(serde_json::from_value(body["tracks"].clone())?) + } + + async fn search_albums(&self, query: &str, offset: u32, limit: u32) -> Result> { + let resp = self + .get_request("album/search") + .query(&[("query", query), ("offset", &offset.to_string()), ("limit", &limit.to_string())]) + .send() + .await?; + let body = Self::check_response(resp).await?; + Ok(serde_json::from_value(body["albums"].clone())?) + } + + async fn search_artists(&self, query: &str, offset: u32, limit: u32) -> Result> { + let resp = self + .get_request("artist/search") + .query(&[("query", query), ("offset", &offset.to_string()), ("limit", &limit.to_string())]) + .send() + .await?; + let body = Self::check_response(resp).await?; + Ok(serde_json::from_value(body["artists"].clone())?) } // --- Favorites / Library --- @@ -329,34 +456,103 @@ impl QobuzClient { Ok(serde_json::from_value(body)?) } - pub async fn get_fav_tracks(&self, offset: u32, limit: u32) -> Result> { + /// Fetch all favorite IDs (tracks, albums, artists) in one call. + async fn get_fav_ids(&self) -> Result { let resp = self - .get_request("favorite/getUserFavorites") - .query(&[("type", "tracks"), ("offset", &offset.to_string()), ("limit", &limit.to_string())]) + .get_request("favorite/getUserFavoriteIds") .send() .await?; let body = Self::check_response(resp).await?; - Ok(serde_json::from_value(body["tracks"].clone())?) + Ok(serde_json::from_value(body)?) + } + + /// Batch-fetch tracks by ID via POST /track/getList. + async fn get_tracks_by_ids(&self, ids: &[i64]) -> Result> { + if ids.is_empty() { + return Ok(vec![]); + } + let resp = self + .post_request("track/getList") + .json(&serde_json::json!({ "tracks_id": ids })) + .send() + .await?; + let body = Self::check_response(resp).await?; + let items: Vec = serde_json::from_value( + body["tracks"]["items"].clone(), + ) + .unwrap_or_default(); + Ok(items) + } + + pub async fn get_fav_tracks(&self, offset: u32, limit: u32) -> Result> { + let ids = self.get_fav_ids().await?; + let all_ids = ids.tracks.unwrap_or_default(); + let total = all_ids.len() as i32; + let page: Vec = all_ids + .into_iter() + .skip(offset as usize) + .take(limit as usize) + .collect(); + let items = self.get_tracks_by_ids(&page).await?; + Ok(SearchResultItems { + items: Some(items), + total: Some(total), + offset: Some(offset as i32), + limit: Some(limit as i32), + }) } pub async fn get_fav_albums(&self, offset: u32, limit: u32) -> Result> { - let resp = self - .get_request("favorite/getUserFavorites") - .query(&[("type", "albums"), ("offset", &offset.to_string()), ("limit", &limit.to_string())]) - .send() - .await?; - let body = Self::check_response(resp).await?; - Ok(serde_json::from_value(body["albums"].clone())?) + let ids = self.get_fav_ids().await?; + let all_ids = ids.albums.unwrap_or_default(); + let total = all_ids.len() as i32; + let page: Vec<&str> = all_ids + .iter() + .skip(offset as usize) + .take(limit as usize) + .map(|s| s.as_str()) + .collect(); + let mut items = Vec::with_capacity(page.len()); + for album_id in page { + match self.get_album(album_id).await { + Ok(a) => items.push(a), + Err(e) => eprintln!("[fav] failed to fetch album {}: {}", album_id, e), + } + } + Ok(SearchResultItems { + items: Some(items), + total: Some(total), + offset: Some(offset as i32), + limit: Some(limit as i32), + }) } pub async fn get_fav_artists(&self, offset: u32, limit: u32) -> Result> { - let resp = self - .get_request("favorite/getUserFavorites") - .query(&[("type", "artists"), ("offset", &offset.to_string()), ("limit", &limit.to_string())]) - .send() - .await?; - let body = Self::check_response(resp).await?; - Ok(serde_json::from_value(body["artists"].clone())?) + let ids = self.get_fav_ids().await?; + let all_ids = ids.artists.unwrap_or_default(); + let total = all_ids.len() as i32; + let page: Vec = all_ids + .into_iter() + .skip(offset as usize) + .take(limit as usize) + .collect(); + let mut items = Vec::with_capacity(page.len()); + for artist_id in page { + match self.get_artist_page(artist_id).await { + Ok(v) => { + if let Ok(a) = serde_json::from_value::(v) { + items.push(a); + } + } + Err(e) => eprintln!("[fav] failed to fetch artist {}: {}", artist_id, e), + } + } + Ok(SearchResultItems { + items: Some(items), + total: Some(total), + offset: Some(offset as i32), + limit: Some(limit as i32), + }) } // --- Playlist management --- diff --git a/rust/src/api/models.rs b/rust/src/api/models.rs index 8efc3b7..a729a57 100644 --- a/rust/src/api/models.rs +++ b/rust/src/api/models.rs @@ -74,12 +74,16 @@ pub struct AudioInfoDto { #[derive(Debug, Deserialize, Clone, Serialize)] pub struct TrackFileUrlDto { pub track_id: Option, - pub duration: Option, + pub duration: Option, pub url: Option, + pub url_template: Option, + pub n_segments: Option, pub format_id: Option, pub mime_type: Option, pub sampling_rate: Option, pub bit_depth: Option, + /// qbz-1 encryption key: "qbz-1.." + pub key: Option, } // --- Album --- @@ -219,6 +223,15 @@ pub struct FavArtistDto { pub image: Option, } +// --- Favorite IDs --- + +#[derive(Debug, Deserialize, Serialize)] +pub struct FavIdsDto { + pub tracks: Option>, + pub albums: Option>, + pub artists: Option>, +} + // --- Format --- #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 7a8255d..4c13983 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -81,6 +81,9 @@ struct PrefetchedTrack { track_id: i64, track: api::models::TrackDto, url: String, + n_segments: u32, + encryption_key: Option, + prefetch_data: Option, } struct BackendInner { @@ -476,8 +479,9 @@ pub unsafe extern "C" fn qobuz_backend_play_track( } }; - let (track, url) = if let Some(pf) = cached { - (pf.track, pf.url) + // Extract prefetch_data to embed directly into TrackInfo + let (track, url, n_segments, encryption_key, prefetch_data) = if let Some(pf) = cached { + (pf.track, pf.url, pf.n_segments, pf.encryption_key, pf.prefetch_data) } else { // Fetch track metadata let track = match client.lock().await.get_track(track_id).await { @@ -489,11 +493,24 @@ pub unsafe extern "C" fn qobuz_backend_play_track( Ok(u) => u, Err(e) => { call_cb(cb, ud, EV_TRACK_URL_ERR, &err_json(&e.to_string())); return; } }; - let url = match url_dto.url { - Some(u) => u, - None => { call_cb(cb, ud, EV_TRACK_URL_ERR, &err_json("no stream URL")); return; } + let encryption_key = url_dto.key.clone(); + // Prefer segmented url_template (reliable CDN path), fall back to plain url + eprintln!("[lib] url_dto: url={:?}, url_template={:?}, n_segments={:?}, mime={:?}, key_present={}", + url_dto.url.as_deref().map(|u| &u[..u.len().min(60)]), + url_dto.url_template.as_deref().map(|u| &u[..u.len().min(60)]), + url_dto.n_segments, + url_dto.mime_type, + encryption_key.is_some()); + let (url, n_segments) = if let (Some(tmpl), Some(n)) = (url_dto.url_template, url_dto.n_segments) { + (tmpl, n) + } else if let Some(u) = url_dto.url { + (u, 0u32) + } else { + call_cb(cb, ud, EV_TRACK_URL_ERR, &err_json("no stream URL")); + return; }; - (track, url) + eprintln!("[lib] resolved: n_segments={n_segments}, url_prefix={}", &url[..url.len().min(80)]); + (track, url, n_segments, encryption_key, None) }; // 2. Notify track change @@ -513,7 +530,7 @@ pub unsafe extern "C" fn qobuz_backend_play_track( if let Some(dur) = track.duration { status.duration_secs.store(dur as u64, std::sync::atomic::Ordering::Relaxed); } - let _ = cmd_tx.send(player::PlayerCommand::Play(player::TrackInfo { track, url, replaygain_db })); + let _ = cmd_tx.send(player::PlayerCommand::Play(player::TrackInfo { track, url, n_segments, encryption_key, replaygain_db, prefetch_data })); // 5. State notification call_cb(cb, ud, EV_STATE_CHANGED, r#"{"state":"playing"}"#); @@ -582,6 +599,28 @@ pub unsafe extern "C" fn qobuz_backend_take_track_finished(ptr: *mut Backend) -> if finished { 1 } else { 0 } } +#[no_mangle] +pub unsafe extern "C" fn qobuz_backend_take_track_transitioned(ptr: *mut Backend) -> c_int { + let inner = &(*ptr).0; + let transitioned = inner + .player + .status + .track_transitioned + .swap(false, std::sync::atomic::Ordering::SeqCst); + + if transitioned { + // Emit track changed so the Qt UI and Scrobbler automatically pick up the new song + if let Some(track) = inner.player.status.current_track.lock().unwrap().as_ref() { + if let Ok(j) = serde_json::to_string(track) { + call_cb(inner.cb, inner.ud, EV_TRACK_CHANGED, &j); + } + } + 1 + } else { + 0 + } +} + // ---------- ReplayGain / Gapless ---------- #[no_mangle] @@ -600,10 +639,11 @@ pub unsafe extern "C" fn qobuz_backend_prefetch_track( track_id: i64, format_id: i32, ) { - let inner = &(*ptr).0; + let inner = &(*ptr).0; let client = inner.client.clone(); - let prefetch = inner.prefetch.clone(); let format = Format::from_id(format_id); + let cmd_tx = inner.player.cmd_tx.clone(); + let rg_enabled = inner.replaygain_enabled.clone(); spawn(inner, async move { let track = match client.lock().await.get_track(track_id).await { @@ -614,11 +654,41 @@ pub unsafe extern "C" fn qobuz_backend_prefetch_track( Ok(u) => u, Err(_) => return, }; - let url = match url_dto.url { - Some(u) => u, - None => return, + let encryption_key = url_dto.key.clone(); + let (url, n_segments) = if let (Some(tmpl), Some(n)) = (url_dto.url_template, url_dto.n_segments) { + (tmpl, n) + } else if let Some(u) = url_dto.url { + (u, 0u32) + } else { + return; }; - *prefetch.lock().await = Some(PrefetchedTrack { track_id, track, url }); + + // KICKSTART DOWNLOADING IMMEDIATELY + let prefetch_data = if n_segments > 0 { + Some(player::decoder::start_prefetch( + url.clone(), + n_segments, + encryption_key.as_deref(), + 1, + )) + } else { + None + }; + + let replaygain_db = if rg_enabled.load(std::sync::atomic::Ordering::Relaxed) { + track.audio_info.as_ref().and_then(|ai| ai.replaygain_track_gain) + } else { + None + }; + + let _ = cmd_tx.send(player::PlayerCommand::QueueNext(player::TrackInfo { + track, + url, + n_segments, + encryption_key, + replaygain_db, + prefetch_data, + })); }); } @@ -717,34 +787,6 @@ pub unsafe extern "C" fn qobuz_backend_get_user(ptr: *mut Backend) { }); } -// ---------- Visualizer PCM access ---------- - -/// Read up to `max_samples` f32 PCM values into `buf`. -/// Returns the number of samples actually read. -#[no_mangle] -pub unsafe extern "C" fn qobuz_backend_viz_read( - ptr: *mut Backend, - buf: *mut f32, - max_samples: u32, -) -> u32 { - let consumer = &(*ptr).0.player.status.viz_consumer; - let Ok(mut lock) = consumer.try_lock() else { return 0 }; - let slice = std::slice::from_raw_parts_mut(buf, max_samples as usize); - rb::RbConsumer::read(&mut *lock, slice).unwrap_or(0) as u32 -} - -/// Returns current sample rate of the audio stream (0 if idle). -#[no_mangle] -pub unsafe extern "C" fn qobuz_backend_viz_sample_rate(ptr: *const Backend) -> u32 { - (*ptr).0.player.status.viz_sample_rate.load(std::sync::atomic::Ordering::Relaxed) -} - -/// Returns current channel count (0 if idle). -#[no_mangle] -pub unsafe extern "C" fn qobuz_backend_viz_channels(ptr: *const Backend) -> u32 { - (*ptr).0.player.status.viz_channels.load(std::sync::atomic::Ordering::Relaxed) -} - // ---------- Playlist management ---------- pub const EV_PLAYLIST_CREATED: c_int = 20; diff --git a/rust/src/player/decoder.rs b/rust/src/player/decoder.rs index 7d9599f..1adea75 100644 --- a/rust/src/player/decoder.rs +++ b/rust/src/player/decoder.rs @@ -1,12 +1,17 @@ use anyhow::Result; -use rb::RB; +use std::collections::VecDeque; use std::io::{self, Read, Seek, SeekFrom}; use std::sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Condvar, Mutex, OnceLock, }; +use aes::Aes128; +use ctr::cipher::{KeyIvInit, StreamCipher}; +use ctr::Ctr128BE; + use symphonia::core::{ + audio::SampleBuffer, codecs::DecoderOptions, errors::Error as SymphoniaError, formats::{FormatOptions, SeekMode, SeekTo}, @@ -18,10 +23,723 @@ use symphonia::core::{ use super::{output::AudioOutput, PlayerCommand, PlayerStatus, TrackInfo}; -/// First 512 KiB of stream kept in memory to support backward seeks during probing. +// ── Bounded Segmented streaming infrastructure ─────────────────────────────── + +const MAX_BUF_BYTES: usize = 4 * 1024 * 1024; // 4MB cache limit + +#[derive(Clone)] +pub struct PrefetchData { + pub buf: Arc, + pub is_flac_flag: Arc, + pub cancel: Arc, + pub initial_skip_secs: f64, +} + +pub enum NextAction { + Play(TrackInfo), + Transition(TrackInfo), +} + +pub struct SharedBuf { + queue: Mutex>, + done: AtomicBool, + condvar_read: Condvar, + condvar_write: Condvar, +} + +impl SharedBuf { + fn new() -> Self { + Self { + queue: Mutex::new(VecDeque::with_capacity(1024 * 1024)), + done: AtomicBool::new(false), + condvar_read: Condvar::new(), + condvar_write: Condvar::new(), + } + } + + fn append(&self, bytes: &[u8], cancel: &Arc) { + let mut q = self.queue.lock().unwrap(); + let mut offset = 0; + + while offset < bytes.len() { + if cancel.load(Ordering::Acquire) { + break; + } + if q.len() >= MAX_BUF_BYTES { + let (new_q, _) = self + .condvar_write + .wait_timeout(q, std::time::Duration::from_millis(50)) + .unwrap(); + q = new_q; + continue; + } + let take = (MAX_BUF_BYTES - q.len()).min(bytes.len() - offset); + q.extend(&bytes[offset..offset + take]); + offset += take; + self.condvar_read.notify_all(); + } + } + + fn wait_init(&self, cancel: &Arc) { + let mut q = self.queue.lock().unwrap(); + while q.is_empty() && !self.done.load(Ordering::Acquire) && !cancel.load(Ordering::Acquire) + { + let (new_q, _) = self + .condvar_read + .wait_timeout(q, std::time::Duration::from_millis(50)) + .unwrap(); + q = new_q; + } + } +} + +struct SegmentStreamSource { + buf: Arc, + cancel: Arc, +} + +impl Read for SegmentStreamSource { + fn read(&mut self, out: &mut [u8]) -> io::Result { + let mut q = self.buf.queue.lock().unwrap(); + loop { + if self.cancel.load(Ordering::Acquire) { + return Err(io::Error::new( + io::ErrorKind::Interrupted, + "stream cancelled", + )); + } + + if !q.is_empty() { + let n = q.len().min(out.len()); + let (s1, s2) = q.as_slices(); + let n1 = s1.len().min(n); + out[..n1].copy_from_slice(&s1[..n1]); + let n2 = n - n1; + if n2 > 0 { + out[n1..n].copy_from_slice(&s2[..n2]); + } + q.drain(..n); + self.buf.condvar_write.notify_all(); + return Ok(n); + } + + if self.buf.done.load(Ordering::Acquire) { + return Ok(0); // EOF + } + + let (new_q, _) = self + .buf + .condvar_read + .wait_timeout(q, std::time::Duration::from_millis(50)) + .unwrap(); + q = new_q; + } + } +} + +impl Seek for SegmentStreamSource { + fn seek(&mut self, _pos: SeekFrom) -> io::Result { + Err(io::Error::new( + io::ErrorKind::Unsupported, + "Live streams cannot be seeked directly.", + )) + } +} + +impl MediaSource for SegmentStreamSource { + fn is_seekable(&self) -> bool { + false + } + fn byte_len(&self) -> Option { + None + } +} + +fn http_client() -> &'static reqwest::blocking::Client { + static CLIENT: OnceLock = OnceLock::new(); + CLIENT.get_or_init(|| reqwest::blocking::Client::new()) +} + +fn fetch_segment(url: &str, cancel: &Arc) -> Option> { + for attempt in 0..3 { + if cancel.load(Ordering::Acquire) { + return None; + } + if attempt > 0 { + std::thread::sleep(std::time::Duration::from_millis(500 * attempt)); + } + + match http_client().get(url).send() { + Ok(mut r) if r.status().is_success() => { + let mut data = + Vec::with_capacity(r.content_length().unwrap_or(1024 * 1024) as usize); + let mut buf = [0u8; 32 * 1024]; + let mut ok = true; + loop { + if cancel.load(Ordering::Acquire) { + return None; + } + match std::io::Read::read(&mut r, &mut buf) { + Ok(0) => break, + Ok(n) => data.extend_from_slice(&buf[..n]), + Err(e) => { + eprintln!("[seg] chunk read error: {e}"); + ok = false; + break; + } + } + } + if ok { + return Some(data); + } + } + Ok(r) => eprintln!("[seg] HTTP {} on attempt {}", r.status(), attempt), + Err(e) => eprintln!("[seg] fetch error on attempt {}: {}", attempt, e), + } + } + None +} + +fn find_dfla_blocks(data: &[u8]) -> Option> { + let mut pos = 0usize; + while pos + 8 <= data.len() { + let size = match data[pos..pos + 4].try_into().ok().map(u32::from_be_bytes) { + Some(s) if s >= 8 && pos + s as usize <= data.len() => s as usize, + _ => break, + }; + let t = &data[pos + 4..pos + 8]; + + if t == b"dfLa" { + let body = &data[pos + 8..pos + size]; + if body.len() > 4 { + return Some(body[4..].to_vec()); + } + } + + let inner = match t { + b"moov" | b"trak" | b"mdia" | b"minf" | b"stbl" => Some(&data[pos + 8..pos + size]), + b"stsd" => data.get(pos + 16..pos + size), + b"fLaC" => data.get(pos + 36..pos + size), + _ => None, + }; + if let Some(inner) = inner { + if let Some(r) = find_dfla_blocks(inner) { + return Some(r); + } + } + pos += size; + } + None +} + +fn extract_flac_header(init_data: &[u8]) -> Option> { + let blocks = find_dfla_blocks(init_data)?; + let mut out = Vec::with_capacity(4 + blocks.len()); + out.extend_from_slice(b"fLaC"); + out.extend_from_slice(&blocks); + Some(out) +} + +const QBZ1_UUID: [u8; 16] = [ + 0x3b, 0x42, 0x12, 0x92, 0x56, 0xf3, 0x5f, 0x75, 0x92, 0x36, 0x63, 0xb6, 0x9a, 0x1f, 0x52, 0xb2, +]; + +fn read_u32_be(data: &[u8], offset: usize) -> u32 { + u32::from_be_bytes(data[offset..offset + 4].try_into().unwrap()) +} +fn read_u24_be(data: &[u8], offset: usize) -> u32 { + ((data[offset] as u32) << 16) | ((data[offset + 1] as u32) << 8) | (data[offset + 2] as u32) +} + +fn decrypt_and_extract_frames(data: &mut [u8], key: Option<&[u8; 16]>) -> Vec { + let mut frames = Vec::new(); + let mut pos = 0usize; + while pos + 8 <= data.len() { + let box_size = read_u32_be(data, pos) as usize; + if box_size < 8 || pos + box_size > data.len() { + break; + } + + if &data[pos + 4..pos + 8] == b"uuid" && box_size >= 36 { + if &data[pos + 8..pos + 24] == QBZ1_UUID { + let body = pos + 24; + if body + 12 > data.len() { + pos += box_size; + continue; + } + + let raw_offset = read_u32_be(data, body + 4) as usize; + let num_samples = read_u24_be(data, body + 9) as usize; + let sample_data_start = pos + raw_offset; + let table_start = body + 12; + + let mut offset = sample_data_start; + for i in 0..num_samples { + let e = table_start + i * 16; + if e + 16 > data.len() { + break; + } + let size = read_u32_be(data, e) as usize; + let enc = u16::from_be_bytes([data[e + 6], data[e + 7]]) != 0; + + let end = offset + size; + if end <= data.len() { + if enc && key.is_some() { + let mut iv = [0u8; 16]; + iv[..8].copy_from_slice(&data[e + 8..e + 16]); + Ctr128BE::::new(key.unwrap().into(), (&iv).into()) + .apply_keystream(&mut data[offset..end]); + } + frames.extend_from_slice(&data[offset..end]); + } + offset += size; + } + } + } + pos += box_size; + } + frames +} + +fn parse_key(encryption_key: Option<&str>) -> Option<[u8; 16]> { + match encryption_key { + Some(hex_str) => { + let b = hex::decode(hex_str).ok()?; + if b.len() != 16 { + return None; + } + Some(b.try_into().unwrap()) + } + None => None, + } +} + +// Exposed externally to let C++ kickstart the download process via lib.rs prefetch request +pub fn start_prefetch( + url_template: String, + n_segments: u32, + encryption_key: Option<&str>, + start_segment: usize, +) -> PrefetchData { + let cancel = Arc::new(AtomicBool::new(false)); + let buf = Arc::new(SharedBuf::new()); + let is_flac_flag = Arc::new(AtomicBool::new(false)); + let mut initial_skip_secs = 0.0; + let key = parse_key(encryption_key); + + let cancel_clone = Arc::clone(&cancel); + let buf_clone = Arc::clone(&buf); + let is_flac_flag_clone = Arc::clone(&is_flac_flag); + + if key.is_none() && start_segment > 1 { + initial_skip_secs = 10.0; + } + + std::thread::spawn(move || { + let init_url = url_template.replace("$SEGMENT$", "0"); + if let Some(init) = fetch_segment(&init_url, &cancel_clone) { + if let Some(header) = extract_flac_header(&init) { + eprintln!("[seg] FLAC header detected"); + buf_clone.append(&header, &cancel_clone); + is_flac_flag_clone.store(true, Ordering::Release); + } else { + eprintln!("[seg] Assuming raw MP3 stream"); + } + } + + let start = start_segment.max(1).min(n_segments as usize); + + // NEW HACK: Prepend Segment 1 for unencrypted MP4 streams if seeking past segment 1 + if key.is_none() && start > 1 { + let url1 = url_template.replace("$SEGMENT$", "1"); + eprintln!( + "[seg] fetching INIT segment 1 for unencrypted MP4: {}", + url1 + ); + if let Some(init_seg) = fetch_segment(&url1, &cancel_clone) { + buf_clone.append(&init_seg, &cancel_clone); + } + } + eprintln!( + "[seg] prefetch: start={}, n_segments={}, total segments to fetch={}", + start, + n_segments, + n_segments as usize - start + 1 + ); + + for seg in start..=n_segments as usize { + if cancel_clone.load(Ordering::Acquire) { + break; + } + let url = url_template.replace("$SEGMENT$", &seg.to_string()); + eprintln!("[seg] fetching segment {}: {}", seg, url); + let mut data = match fetch_segment(&url, &cancel_clone) { + Some(d) => d, + None => { + eprintln!( + "[seg] segment {} fetch failed permanently, aborting stream", + seg + ); + break; + } + }; + + let frames = decrypt_and_extract_frames(&mut data, key.as_ref()); + buf_clone.append(&frames, &cancel_clone); + } + + buf_clone.done.store(true, Ordering::Release); + buf_clone.condvar_read.notify_all(); + eprintln!("[seg] download thread done"); + }); + + PrefetchData { + buf, + is_flac_flag, + cancel, + initial_skip_secs, + } +} + +// ───────────────────────────────────────────────────────────────────────────── + +pub fn play_track_inline( + info: TrackInfo, + status: &PlayerStatus, + paused: &Arc, + audio_output: &mut Option, + cmd_rx: &std::sync::mpsc::Receiver, +) -> Result> { + eprintln!( + "[decoder] play_track_inline: n_segments={}, encrypted={}, url={}", + info.n_segments, + info.encryption_key.is_some(), + info.url + ); + + if info.n_segments > 0 { + play_segmented(info, status, paused, audio_output, cmd_rx) + } else { + play_plain(info, status, paused, audio_output, cmd_rx) + } +} + +// ── Segmented (qbz-1 FLAC/AAC) playback ───────────────────────────────────────── + +fn play_segmented( + mut info: TrackInfo, + status: &PlayerStatus, + paused: &Arc, + audio_output: &mut Option, + cmd_rx: &std::sync::mpsc::Receiver, +) -> Result> { + // Qobuz explicitly chunks into 10-second segments, with the last segment being the remainder. + // (Averaging duration/n_segments incorrectly calculates ~9.7s, leading to EOF skipping when seeking near the end) + let segment_duration_secs: f64 = 10.0; + eprintln!( + "[seg] track duration={:?}, n_segments={}, segment_duration_secs={}", + info.track.duration, info.n_segments, segment_duration_secs + ); + + let mut start_segment: usize = 1; + let mut skip_samples: u64 = 0; + let mut stopped = false; + let mut prefetched_next: Option = None; + + // Use the passed-in prefetch buffer if available! + let mut current_prefetch = info.prefetch_data.take(); + + 'restart: loop { + let cancel; + let buf; + let is_flac_flag; + + if let Some(pd) = current_prefetch.take() { + eprintln!("[seg] Instantly connecting to PRE-FETCHED audio buffer in RAM!"); + cancel = pd.cancel; + buf = pd.buf; + is_flac_flag = pd.is_flac_flag; + } else { + eprintln!("[seg] starting prefetch from segment {}", start_segment); + let p_data = start_prefetch( + info.url.clone(), + info.n_segments, + info.encryption_key.as_deref(), + start_segment, + ); + cancel = p_data.cancel; + buf = p_data.buf; + is_flac_flag = p_data.is_flac_flag; + if p_data.initial_skip_secs > 0.0 { + // We prepended segment 1, so skip its audio length + skip_samples += (p_data.initial_skip_secs * 44100.0) as u64; + } + } + + buf.wait_init(&cancel); + + if cancel.load(Ordering::Acquire) { + if stopped || prefetched_next.is_some() { + break 'restart; + } + continue 'restart; + } + + let source = SegmentStreamSource { + buf: Arc::clone(&buf), + cancel: Arc::clone(&cancel), + }; + let mss = MediaSourceStream::new(Box::new(source), Default::default()); + + let mut hint = Hint::new(); + if is_flac_flag.load(Ordering::Acquire) { + hint.with_extension("flac"); + } else { + hint.with_extension("mp3"); + } + + let probed = match symphonia::default::get_probe().format( + &hint, + mss, + &FormatOptions::default(), + &MetadataOptions::default(), + ) { + Ok(p) => p, + Err(e) => { + eprintln!("[seg] probe failed: {e}"); + cancel.store(true, Ordering::Release); + break 'restart; + } + }; + + let mut format = probed.format; + let track = match format + .tracks() + .iter() + .find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL) + { + Some(t) => t.clone(), + None => { + cancel.store(true, Ordering::Release); + break 'restart; + } + }; + + let track_id = track.id; + let sample_rate = track.codec_params.sample_rate.unwrap_or(44100); + let channels = track.codec_params.channels.map(|c| c.count()).unwrap_or(2); + + let mut decoder = match symphonia::default::get_codecs() + .make(&track.codec_params, &DecoderOptions::default()) + { + Ok(d) => d, + Err(e) => { + eprintln!("[seg] decoder init failed: {e}"); + cancel.store(true, Ordering::Release); + break 'restart; + } + }; + + if let Some(ao) = audio_output.as_ref() { + if ao.sample_rate != sample_rate || ao.channels != channels { + *audio_output = None; + } + } + if audio_output.is_none() { + match AudioOutput::try_open(sample_rate, channels) { + Ok(ao) => *audio_output = Some(ao), + Err(e) => { + eprintln!("[seg] audio open failed: {e}"); + cancel.store(true, Ordering::Release); + break 'restart; + } + } + } + let ao = audio_output.as_mut().unwrap(); + + let mut samples_to_skip = skip_samples * channels as u64; + let segment_offset_secs = + ((start_segment.saturating_sub(1)) as f64 * segment_duration_secs).round() as u64; + eprintln!( + "[seg] segment_offset_secs={}, samples_to_skip={}", + segment_offset_secs, samples_to_skip + ); + let mut total_samples_decoded: u64 = 0; + + 'decode: loop { + loop { + // 1. Process pending commands + loop { + match cmd_rx.try_recv() { + Ok(PlayerCommand::Pause) => { + paused.store(true, Ordering::SeqCst); + *status.state.lock().unwrap() = super::PlayerState::Paused; + } + Ok(PlayerCommand::Resume) => { + paused.store(false, Ordering::SeqCst); + *status.state.lock().unwrap() = super::PlayerState::Playing; + } + Ok(PlayerCommand::SetVolume(v)) => { + status.volume.store(v, Ordering::Relaxed); + } + Ok(PlayerCommand::Stop) => { + paused.store(false, Ordering::SeqCst); + *status.state.lock().unwrap() = super::PlayerState::Idle; + *status.current_track.lock().unwrap() = None; + status.position_secs.store(0, Ordering::Relaxed); + status.duration_secs.store(0, Ordering::Relaxed); + stopped = true; + cancel.store(true, Ordering::Release); + break 'decode; + } + Ok(PlayerCommand::Play(new_info)) => { + prefetched_next = Some(NextAction::Play(new_info)); + cancel.store(true, Ordering::Release); + break 'decode; + } + Ok(PlayerCommand::QueueNext(new_info)) => { + prefetched_next = Some(NextAction::Transition(new_info)); + } + Err(std::sync::mpsc::TryRecvError::Empty) => break, + Err(std::sync::mpsc::TryRecvError::Disconnected) => { + stopped = true; + cancel.store(true, Ordering::Release); + break 'decode; + } + } + } + + if stopped { + break 'decode; + } + + // 2. Process Seeks + if status.seek_requested.load(Ordering::SeqCst) { + status.seek_requested.store(false, Ordering::SeqCst); + let target_secs = status.seek_target_secs.load(Ordering::Relaxed) as f64; + status + .position_secs + .store(target_secs as u64, Ordering::Relaxed); + + eprintln!( + "[seg] seek requested: {}s, segment_duration={}s, n_segments={}", + target_secs, segment_duration_secs, info.n_segments + ); + cancel.store(true, Ordering::Release); + start_segment = ((target_secs / segment_duration_secs).floor() as usize + 1) + .max(1) + .min(info.n_segments as usize); + let sub_sec = target_secs % segment_duration_secs; + skip_samples = (sub_sec * sample_rate as f64) as u64; + eprintln!( + "[seg] start_segment={}, skip_samples={}", + start_segment, skip_samples + ); + break 'decode; + } + + // 3. Pause sleep cycle + if paused.load(Ordering::SeqCst) { + std::thread::sleep(std::time::Duration::from_millis(10)); + continue; + } + + break; // Event loop complete, proceed to packet decoding + } + + let packet = match format.next_packet() { + Ok(p) => p, + Err(SymphoniaError::IoError(e)) + if e.kind() == std::io::ErrorKind::UnexpectedEof => + { + break 'decode; // Natural EOF! Return prefetched track instantly + } + Err(SymphoniaError::IoError(e)) if e.kind() == std::io::ErrorKind::Interrupted => { + break 'decode; + } + Err(SymphoniaError::ResetRequired) => { + decoder.reset(); + continue; + } + Err(e) => { + eprintln!("[seg] format error: {e}"); + break 'decode; + } + }; + + if packet.track_id() != track_id { + continue; + } + + let decoded = match decoder.decode(&packet) { + Ok(d) => d, + Err(SymphoniaError::IoError(_)) => break 'decode, + Err(_) => continue, + }; + + let frame_samples = decoded.frames() as u64; + total_samples_decoded += frame_samples; + + let current_session_secs = total_samples_decoded / sample_rate as u64; + status.position_secs.store( + segment_offset_secs + current_session_secs, + Ordering::Relaxed, + ); + + let volume = status.volume.load(Ordering::Relaxed) as f32 / 100.0; + let rg = *status.replaygain_gain.lock().unwrap(); + let gain = (volume * rg).min(1.0); + + if samples_to_skip > 0 { + let mut sbuf = SampleBuffer::::new(decoded.capacity() as u64, *decoded.spec()); + sbuf.copy_interleaved_ref(decoded); + let raw = sbuf.samples(); + let frame_samples = raw.len() as u64; + if frame_samples <= samples_to_skip { + samples_to_skip -= frame_samples; + continue; + } + let skip_off = samples_to_skip as usize; + samples_to_skip = 0; + let samples: Vec = raw[skip_off..].iter().map(|s| s * gain).collect(); + ao.write_samples(&samples, &cancel)?; + } else { + ao.write(decoded, gain, &cancel)?; + } + } // end 'decode + + if stopped { + break 'restart; + } + + // If we broke 'decode natively (cancel is false), it's a natural EOF. + if !cancel.load(Ordering::Acquire) { + break 'restart; + } + + // If we cancelled because of a hard Play() command, break immediately. + if let Some(NextAction::Play(_)) = prefetched_next { + break 'restart; + } + + // Otherwise, we cancelled because of a Seek! + // We just continue decoding the current track. Any NextAction::Transition remains queued safely! + continue 'restart; + } // end 'restart + + if stopped { + *audio_output = None; + } + + Ok(prefetched_next) +} + +// ── Plain HTTP streaming source (non-segmented tracks) ─────────────────────── + const HEAD_SIZE: usize = 512 * 1024; struct HttpStreamSource { + url: String, reader: reqwest::blocking::Response, head: Vec, reader_pos: u64, @@ -30,8 +748,13 @@ struct HttpStreamSource { } impl HttpStreamSource { - fn new(response: reqwest::blocking::Response, content_length: Option) -> Self { + fn new( + url: String, + response: reqwest::blocking::Response, + content_length: Option, + ) -> Self { Self { + url, reader: response, head: Vec::new(), reader_pos: 0, @@ -83,39 +806,86 @@ impl Seek for HttpStreamSource { return Ok(self.pos); } - if target < self.reader_pos { - if target < self.head.len() as u64 { - self.pos = target; - return Ok(self.pos); - } - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "backward seek past head buffer", - )); + if target < self.head.len() as u64 { + self.pos = target; + return Ok(self.pos); } - let mut remaining = target - self.reader_pos; - while remaining > 0 { - let mut discard = [0u8; 8192]; - let want = (remaining as usize).min(discard.len()); - match self.reader.read(&mut discard[..want]) { - Ok(0) => break, - Ok(n) => { - if self.reader_pos < HEAD_SIZE as u64 { - let capacity = HEAD_SIZE.saturating_sub(self.head.len()); - let to_buf = n.min(capacity); - if to_buf > 0 { - self.head.extend_from_slice(&discard[..to_buf]); + let is_small_skip = target > self.reader_pos && (target - self.reader_pos) < 1024 * 1024; + + if is_small_skip { + let mut remaining = target - self.reader_pos; + while remaining > 0 { + let mut discard = [0u8; 8192]; + let want = (remaining as usize).min(discard.len()); + match self.reader.read(&mut discard[..want]) { + Ok(0) => break, + Ok(n) => { + if self.reader_pos < HEAD_SIZE as u64 { + let capacity = HEAD_SIZE.saturating_sub(self.head.len()); + let to_buf = n.min(capacity); + if to_buf > 0 { + self.head.extend_from_slice(&discard[..to_buf]); + } } + self.reader_pos += n as u64; + remaining -= n as u64; } - self.reader_pos += n as u64; - remaining -= n as u64; + Err(e) => return Err(e), } - Err(e) => return Err(e), } + self.pos = self.reader_pos; + return Ok(self.pos); + } + + // Large skip or backward skip - use Range HTTP request + let resp = match http_client() + .get(&self.url) + .header("Range", format!("bytes={}-", target)) + .send() + { + Ok(r) => r, + Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + }; + + if resp.status() == reqwest::StatusCode::PARTIAL_CONTENT { + self.reader = resp; + self.reader_pos = target; + self.pos = target; + Ok(self.pos) + } else if resp.status().is_success() { + // Server ignored Range header. Draining to target. + self.reader = resp; + self.reader_pos = 0; + self.pos = 0; + let mut remaining = target; + while remaining > 0 { + let mut discard = [0u8; 8192]; + let want = (remaining as usize).min(discard.len()); + match self.reader.read(&mut discard[..want]) { + Ok(0) => break, + Ok(n) => { + if self.reader_pos < HEAD_SIZE as u64 { + let capacity = HEAD_SIZE.saturating_sub(self.head.len()); + let to_buf = n.min(capacity); + if to_buf > 0 { + self.head.extend_from_slice(&discard[..to_buf]); + } + } + self.reader_pos += n as u64; + remaining -= n as u64; + } + Err(e) => return Err(e), + } + } + self.pos = self.reader_pos; + Ok(self.pos) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + format!("HTTP Error {}", resp.status()), + )) } - self.pos = self.reader_pos; - Ok(self.pos) } } @@ -128,30 +898,52 @@ impl MediaSource for HttpStreamSource { } } -/// Decode and play `url` inline on the calling thread (the player loop). -/// -/// `audio_output` is reused across calls if the sample rate and channel count match, -/// keeping the CPAL stream open between tracks for gapless playback. -/// -/// Returns: -/// - `Ok(Some(TrackInfo))` — a new Play command arrived; start that track next. -/// - `Ok(None)` — track finished naturally or was stopped. -/// - `Err(_)` — unrecoverable playback error. -pub fn play_track_inline( - url: &str, +// ── Plain (non-segmented) playback ─────────────────────────────────────────── + +fn play_plain( + info: TrackInfo, status: &PlayerStatus, paused: &Arc, audio_output: &mut Option, cmd_rx: &std::sync::mpsc::Receiver, -) -> Result> { - let response = reqwest::blocking::get(url)?; - let content_length = response.content_length(); - let source = HttpStreamSource::new(response, content_length); +) -> Result> { + eprintln!("[decoder] using plain URL streaming"); + let (response, content_length) = { + let mut last_err = String::new(); + let mut result = None; + for attempt in 0..3u32 { + if attempt > 0 { + std::thread::sleep(std::time::Duration::from_millis(500 * attempt as u64)); + } + match http_client().get(&info.url).send() { + Ok(resp) => { + if resp.status().is_success() { + let cl = resp.content_length(); + result = Some((resp, cl)); + break; + } else { + last_err = format!("HTTP {}", resp.status()); + } + } + Err(e) => { + last_err = e.to_string(); + } + } + } + result.ok_or_else(|| anyhow::anyhow!("{last_err}"))? + }; + + let source = HttpStreamSource::new(info.url.clone(), response, content_length); let mss = MediaSourceStream::new(Box::new(source), Default::default()); let hint = Hint::new(); let probed = symphonia::default::get_probe() - .format(&hint, mss, &FormatOptions::default(), &MetadataOptions::default()) + .format( + &hint, + mss, + &FormatOptions::default(), + &MetadataOptions::default(), + ) .map_err(|e| anyhow::anyhow!("probe failed: {e}"))?; let mut format = probed.format; @@ -159,7 +951,7 @@ pub fn play_track_inline( .tracks() .iter() .find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL) - .ok_or_else(|| anyhow::anyhow!("no audio tracks"))? + .unwrap() .clone(); let track_id = track.id; @@ -168,147 +960,118 @@ pub fn play_track_inline( let mut decoder = symphonia::default::get_codecs() .make(&track.codec_params, &DecoderOptions::default()) - .map_err(|e| anyhow::anyhow!("decoder init failed: {e}"))?; + .unwrap(); - // Reuse existing audio output if format matches; rebuild only on format change. if let Some(ao) = audio_output.as_ref() { if ao.sample_rate != sample_rate || ao.channels != channels { - *audio_output = None; // will be recreated below + *audio_output = None; } } if audio_output.is_none() { - let mut ao = AudioOutput::try_open(sample_rate, channels)?; - ao.set_viz_producer(status.viz_ring.producer()); - *audio_output = Some(ao); + *audio_output = Some(AudioOutput::try_open(sample_rate, channels)?); } - status.viz_sample_rate.store(sample_rate, Ordering::Relaxed); - status.viz_channels.store(channels as u32, Ordering::Relaxed); let ao = audio_output.as_mut().unwrap(); + let stop_flag = Arc::new(AtomicBool::new(false)); let mut stopped = false; - let mut next_track: Option = None; + let mut prefetched_next: Option = None; 'decode: loop { - // Non-blocking command check — handle Pause/Resume/Seek/Stop/Play loop { - match cmd_rx.try_recv() { - Ok(PlayerCommand::Pause) => { - paused.store(true, Ordering::SeqCst); - *status.state.lock().unwrap() = super::PlayerState::Paused; - } - Ok(PlayerCommand::Resume) => { - paused.store(false, Ordering::SeqCst); - *status.state.lock().unwrap() = super::PlayerState::Playing; - } - Ok(PlayerCommand::SetVolume(v)) => { - status.volume.store(v, Ordering::Relaxed); - } - Ok(PlayerCommand::Stop) => { - paused.store(false, Ordering::SeqCst); - *status.state.lock().unwrap() = super::PlayerState::Idle; - *status.current_track.lock().unwrap() = None; - status.position_secs.store(0, Ordering::Relaxed); - status.duration_secs.store(0, Ordering::Relaxed); - stopped = true; - break 'decode; - } - Ok(PlayerCommand::Play(info)) => { - // New track requested — stop current and return it - next_track = Some(info); - break 'decode; - } - Err(std::sync::mpsc::TryRecvError::Empty) => break, - Err(std::sync::mpsc::TryRecvError::Disconnected) => { - stopped = true; - break 'decode; + loop { + match cmd_rx.try_recv() { + Ok(PlayerCommand::Pause) => { + paused.store(true, Ordering::SeqCst); + *status.state.lock().unwrap() = super::PlayerState::Paused; + } + Ok(PlayerCommand::Resume) => { + paused.store(false, Ordering::SeqCst); + *status.state.lock().unwrap() = super::PlayerState::Playing; + } + Ok(PlayerCommand::Stop) => { + paused.store(false, Ordering::SeqCst); + *status.state.lock().unwrap() = super::PlayerState::Idle; + *status.current_track.lock().unwrap() = None; + status.position_secs.store(0, Ordering::Relaxed); + status.duration_secs.store(0, Ordering::Relaxed); + stopped = true; + stop_flag.store(true, Ordering::SeqCst); + break 'decode; + } + Ok(PlayerCommand::Play(info)) => { + paused.store(false, Ordering::SeqCst); + prefetched_next = Some(NextAction::Play(info)); + stop_flag.store(true, Ordering::SeqCst); + break 'decode; + } + Ok(PlayerCommand::QueueNext(info)) => { + prefetched_next = Some(NextAction::Transition(info)); + } + Ok(PlayerCommand::SetVolume(v)) => { + status.volume.store(v, Ordering::Relaxed); + } + Err(std::sync::mpsc::TryRecvError::Empty) => break, + Err(_) => { + stopped = true; + break 'decode; + } } } - } - // Spin while paused, but keep checking for commands - while paused.load(Ordering::SeqCst) { - std::thread::sleep(std::time::Duration::from_millis(10)); - // Still check for Stop/Play while paused - match cmd_rx.try_recv() { - Ok(PlayerCommand::Resume) => { - paused.store(false, Ordering::SeqCst); - *status.state.lock().unwrap() = super::PlayerState::Playing; - } - Ok(PlayerCommand::Stop) => { - paused.store(false, Ordering::SeqCst); - stopped = true; - break; - } - Ok(PlayerCommand::Play(info)) => { - paused.store(false, Ordering::SeqCst); - next_track = Some(info); - break 'decode; - } - Ok(PlayerCommand::SetVolume(v)) => { - status.volume.store(v, Ordering::Relaxed); - } - _ => {} + if stopped { + break 'decode; } - if stopped { break 'decode; } - } - if stopped { break; } - // Handle seek - if status.seek_requested.load(Ordering::SeqCst) { - status.seek_requested.store(false, Ordering::SeqCst); - let target = status.seek_target_secs.load(Ordering::Relaxed); - let seeked = format.seek( - SeekMode::Coarse, - SeekTo::Time { time: Time::from(target), track_id: None }, - ); - if let Ok(s) = seeked { - let actual = s.actual_ts / sample_rate as u64; - status.position_secs.store(actual, Ordering::Relaxed); + if status.seek_requested.load(Ordering::SeqCst) { + status.seek_requested.store(false, Ordering::SeqCst); + let target = status.seek_target_secs.load(Ordering::Relaxed); + status.position_secs.store(target, Ordering::Relaxed); + + let seeked = format.seek( + SeekMode::Coarse, + SeekTo::Time { + time: Time::from(target), + track_id: None, + }, + ); + if let Ok(s) = seeked { + let actual = s.actual_ts / sample_rate as u64; + status.position_secs.store(actual, Ordering::Relaxed); + } + decoder.reset(); + // Just completed a seek. We must continue 'decode loop, which starts by checking events! + continue 'decode; } - decoder.reset(); - continue; - } + + if paused.load(Ordering::SeqCst) { + std::thread::sleep(std::time::Duration::from_millis(10)); + continue; + } + + break; // break the event loop to read a packet + } // End event loop let packet = match format.next_packet() { Ok(p) => p, - Err(SymphoniaError::IoError(e)) - if e.kind() == std::io::ErrorKind::UnexpectedEof => - { - break; // natural end of track - } - Err(SymphoniaError::ResetRequired) => { - decoder.reset(); - continue; - } - Err(e) => return Err(anyhow::anyhow!("format error: {e}")), + Err(_) => break 'decode, // EOF }; if packet.track_id() != track_id { continue; } - if let Some(ts) = packet.ts().checked_div(sample_rate as u64) { status.position_secs.store(ts, Ordering::Relaxed); } - match decoder.decode(&packet) { - Ok(decoded) => { - let volume = status.volume.load(Ordering::Relaxed) as f32 / 100.0; - let rg = *status.replaygain_gain.lock().unwrap(); - // Use a stop flag tied to new-track-incoming so write doesn't block - let dummy_stop = Arc::new(AtomicBool::new(false)); - ao.write(decoded, (volume * rg).min(1.0), &dummy_stop)?; - } - Err(SymphoniaError::IoError(_)) => break, - Err(SymphoniaError::DecodeError(e)) => eprintln!("decode error: {e}"), - Err(e) => return Err(anyhow::anyhow!("decode error: {e}")), + if let Ok(decoded) = decoder.decode(&packet) { + let volume = status.volume.load(Ordering::Relaxed) as f32 / 100.0; + let rg = *status.replaygain_gain.lock().unwrap(); + let _ = ao.write(decoded, (volume * rg).min(1.0), &stop_flag); } } if stopped { - // On explicit stop, drop the audio output to silence immediately *audio_output = None; } - - Ok(next_track) + Ok(prefetched_next) } diff --git a/rust/src/player/mod.rs b/rust/src/player/mod.rs index 1dc67a4..64e9b3f 100644 --- a/rust/src/player/mod.rs +++ b/rust/src/player/mod.rs @@ -1,33 +1,33 @@ -mod decoder; +pub mod decoder; pub mod output; -use rb::{SpscRb, RB}; use std::sync::{ - atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}, Arc, }; use std::time::Duration; use crate::api::TrackDto; +use decoder::NextAction; -/// Size of the visualizer ring buffer in f32 samples (~180ms at 44.1kHz stereo). -const VIZ_RING_SIZE: usize = 16 * 1024; - -#[derive(Debug, Clone)] +#[derive(Clone)] pub enum PlayerCommand { Play(TrackInfo), + QueueNext(TrackInfo), Pause, Resume, Stop, SetVolume(u8), } -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct TrackInfo { pub track: TrackDto, pub url: String, - /// ReplayGain track gain in dB, if enabled and available. + pub n_segments: u32, + pub encryption_key: Option, pub replaygain_db: Option, + pub prefetch_data: Option, } #[derive(Debug, Clone, PartialEq)] @@ -45,26 +45,16 @@ pub struct PlayerStatus { pub duration_secs: Arc, pub volume: Arc, pub current_track: Arc>>, - /// Set to true by the decode thread when a track finishes naturally. pub track_finished: Arc, - /// Set by the player loop when a seek command arrives; cleared by the decode thread. + pub track_transitioned: Arc, pub seek_requested: Arc, pub seek_target_secs: Arc, - /// Linear gain factor to apply (1.0 = unity). Updated each time a new track starts. pub replaygain_gain: Arc>, - /// When false the audio output is torn down after each track, producing a gap. pub gapless: Arc, - /// Visualizer ring buffer (consumer side, read by FFI). - pub viz_ring: Arc>, - pub viz_consumer: Arc>>, - pub viz_sample_rate: Arc, - pub viz_channels: Arc, } impl PlayerStatus { pub fn new() -> Self { - let viz_ring = Arc::new(SpscRb::new(VIZ_RING_SIZE)); - let viz_consumer = Arc::new(std::sync::Mutex::new(viz_ring.consumer())); Self { state: Arc::new(std::sync::Mutex::new(PlayerState::Idle)), position_secs: Arc::new(AtomicU64::new(0)), @@ -72,33 +62,26 @@ impl PlayerStatus { volume: Arc::new(AtomicU8::new(80)), current_track: Arc::new(std::sync::Mutex::new(None)), track_finished: Arc::new(AtomicBool::new(false)), + track_transitioned: Arc::new(AtomicBool::new(false)), seek_requested: Arc::new(AtomicBool::new(false)), seek_target_secs: Arc::new(AtomicU64::new(0)), replaygain_gain: Arc::new(std::sync::Mutex::new(1.0)), gapless: Arc::new(AtomicBool::new(false)), - viz_ring, - viz_consumer, - viz_sample_rate: Arc::new(AtomicU32::new(0)), - viz_channels: Arc::new(AtomicU32::new(0)), } } pub fn get_state(&self) -> PlayerState { self.state.lock().unwrap().clone() } - pub fn get_position(&self) -> u64 { self.position_secs.load(Ordering::Relaxed) } - pub fn get_duration(&self) -> u64 { self.duration_secs.load(Ordering::Relaxed) } - pub fn get_volume(&self) -> u8 { self.volume.load(Ordering::Relaxed) } - } pub struct Player { @@ -122,51 +105,45 @@ impl Player { pub fn send(&self, cmd: PlayerCommand) { self.cmd_tx.send(cmd).ok(); } - pub fn pause(&self) { self.send(PlayerCommand::Pause); } - pub fn resume(&self) { self.send(PlayerCommand::Resume); } - pub fn stop(&self) { self.send(PlayerCommand::Stop); } - pub fn set_volume(&self, vol: u8) { self.status.volume.store(vol, Ordering::Relaxed); self.send(PlayerCommand::SetVolume(vol)); } - pub fn seek(&self, secs: u64) { self.status.seek_target_secs.store(secs, Ordering::Relaxed); self.status.seek_requested.store(true, Ordering::SeqCst); } } -/// The player loop runs on a single dedicated OS thread. -/// It owns the `AudioOutput` locally so there are no Send constraints. -/// Decoding is performed inline; the command channel is polled via try_recv -/// inside the decode loop to handle Pause/Resume/Seek/Stop/Play without -/// tearng down and re-opening the audio device between tracks. fn player_loop(rx: std::sync::mpsc::Receiver, status: PlayerStatus) { use std::sync::mpsc::RecvTimeoutError; let mut audio_output: Option = None; let paused = Arc::new(AtomicBool::new(false)); - // pending_info holds a Play command that interrupted an ongoing decode - let mut pending_info: Option = None; + let mut pending_action: Option = None; 'outer: loop { - // Wait for a Play command (or use one that was interrupted) - let info = if let Some(p) = pending_info.take() { - p + let info = if let Some(action) = pending_action.take() { + match action { + NextAction::Play(info) | NextAction::Transition(info) => info, + } } else { loop { match rx.recv_timeout(Duration::from_millis(100)) { Ok(PlayerCommand::Play(info)) => break info, + Ok(PlayerCommand::QueueNext(info)) => { + // If completely idle and get QueueNext, treat as Play + break info; + } Ok(PlayerCommand::Stop) => { audio_output = None; paused.store(false, Ordering::SeqCst); @@ -178,15 +155,15 @@ fn player_loop(rx: std::sync::mpsc::Receiver, status: PlayerStatu Ok(PlayerCommand::SetVolume(v)) => { status.volume.store(v, Ordering::Relaxed); } - Ok(_) => {} // Pause/Resume ignored when idle + Ok(_) => {} Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => break 'outer, } } }; - // Compute ReplayGain factor - let rg_factor = info.replaygain_db + let rg_factor = info + .replaygain_db .map(|db| 10f32.powf(db as f32 / 20.0)) .unwrap_or(1.0); *status.replaygain_gain.lock().unwrap() = rg_factor; @@ -199,13 +176,15 @@ fn player_loop(rx: std::sync::mpsc::Receiver, status: PlayerStatu status.position_secs.store(0, Ordering::Relaxed); paused.store(false, Ordering::SeqCst); - match decoder::play_track_inline(&info.url, &status, &paused, &mut audio_output, &rx) { - Ok(Some(next_info)) => { - // Interrupted by a new Play — loop immediately with reused audio output - pending_info = Some(next_info); + match decoder::play_track_inline(info, &status, &paused, &mut audio_output, &rx) { + Ok(Some(NextAction::Play(next_track))) => { + pending_action = Some(NextAction::Play(next_track)); + } + Ok(Some(NextAction::Transition(next_track))) => { + pending_action = Some(NextAction::Play(next_track)); + status.track_transitioned.store(true, Ordering::SeqCst); } Ok(None) => { - // Track finished naturally — tear down audio output if gapless is off if !status.gapless.load(Ordering::Relaxed) { audio_output = None; } @@ -215,8 +194,6 @@ fn player_loop(rx: std::sync::mpsc::Receiver, status: PlayerStatu Err(e) => { eprintln!("playback error: {e}"); *status.state.lock().unwrap() = PlayerState::Error(e.to_string()); - // Signal track end so the queue advances to the next track - // instead of stalling on an unplayable track. status.track_finished.store(true, Ordering::SeqCst); } } diff --git a/rust/src/player/output.rs b/rust/src/player/output.rs index bdafde8..b0ec8f4 100644 --- a/rust/src/player/output.rs +++ b/rust/src/player/output.rs @@ -14,7 +14,6 @@ const RING_BUFFER_SIZE: usize = 32 * 1024; pub struct AudioOutput { ring_buf_producer: rb::Producer, - viz_producer: Option>, _stream: cpal::Stream, pub sample_rate: u32, pub channels: usize, @@ -52,17 +51,12 @@ impl AudioOutput { Ok(Self { ring_buf_producer: producer, - viz_producer: None, _stream: stream, sample_rate, channels, }) } - pub fn set_viz_producer(&mut self, producer: rb::Producer) { - self.viz_producer = Some(producer); - } - pub fn write( &mut self, decoded: AudioBufferRef<'_>, @@ -75,13 +69,13 @@ impl AudioOutput { ); sample_buf.copy_interleaved_ref(decoded); let samples: Vec = sample_buf.samples().iter().map(|s| s * volume).collect(); + self.write_samples(&samples, stop) + } - // Best-effort copy for visualizer (non-blocking, ok to drop samples) - if let Some(ref mut viz) = self.viz_producer { - let _ = viz.write(&samples); - } - - let mut remaining = &samples[..]; + /// Write pre-converted interleaved f32 samples directly to the ring buffer. + /// Returns early (without error) if `stop` is set. + pub fn write_samples(&mut self, samples: &[f32], stop: &Arc) -> Result<()> { + let mut remaining = samples; while !remaining.is_empty() { if stop.load(Ordering::SeqCst) { return Ok(()); diff --git a/src/backend/qobuzbackend.cpp b/src/backend/qobuzbackend.cpp index 6432bdb..46e438e 100644 --- a/src/backend/qobuzbackend.cpp +++ b/src/backend/qobuzbackend.cpp @@ -209,21 +209,6 @@ quint64 QobuzBackend::duration() const { return qobuz_backend_get_duration(m_bac int QobuzBackend::volume() const { return qobuz_backend_get_volume(m_backend); } int QobuzBackend::state() const { return qobuz_backend_get_state(m_backend); } -quint32 QobuzBackend::vizRead(float *buf, quint32 maxSamples) -{ - return qobuz_backend_viz_read(m_backend, buf, maxSamples); -} - -quint32 QobuzBackend::vizSampleRate() const -{ - return qobuz_backend_viz_sample_rate(m_backend); -} - -quint32 QobuzBackend::vizChannels() const -{ - return qobuz_backend_viz_channels(m_backend); -} - // ---- private slots ---- void QobuzBackend::onPositionTick() @@ -232,6 +217,9 @@ void QobuzBackend::onPositionTick() if (qobuz_backend_take_track_finished(m_backend)) emit trackFinished(); + + if (qobuz_backend_take_track_transitioned(m_backend)) + emit trackTransitioned(); } void QobuzBackend::onEvent(int eventType, const QString &json) diff --git a/src/backend/qobuzbackend.hpp b/src/backend/qobuzbackend.hpp index 0f28584..b13fb6b 100644 --- a/src/backend/qobuzbackend.hpp +++ b/src/backend/qobuzbackend.hpp @@ -73,11 +73,6 @@ public: /// 1 = playing, 2 = paused, 0 = idle int state() const; - // --- visualizer PCM --- - quint32 vizRead(float *buf, quint32 maxSamples); - quint32 vizSampleRate() const; - quint32 vizChannels() const; - signals: // auth void loginSuccess(const QString &token, const QJsonObject &user); @@ -106,6 +101,7 @@ signals: void stateChanged(const QString &state); void positionChanged(quint64 position, quint64 duration); void trackFinished(); + void trackTransitioned(); // errors void error(const QString &message); diff --git a/src/list/tracks.cpp b/src/list/tracks.cpp index 461a4ba..ee1611d 100644 --- a/src/list/tracks.cpp +++ b/src/list/tracks.cpp @@ -189,7 +189,7 @@ void Tracks::onContextMenu(const QPoint &pos) menu.addSeparator(); auto *openAlbum = menu.addAction( QIcon(":/res/icons/view-media-album-cover.svg"), - tr("Open album: %1").arg(m_model->trackAt(index.row()).album)); + tr("Open album: %1").arg(QString(m_model->trackAt(index.row()).album).replace(QLatin1Char('&'), QStringLiteral("&&")))); connect(openAlbum, &QAction::triggered, this, [this, albumId] { m_backend->getAlbum(albumId); }); @@ -202,7 +202,7 @@ void Tracks::onContextMenu(const QPoint &pos) const QString artistName = trackJson["performer"].toObject()["name"].toString(); auto *openArtist = menu.addAction( QIcon(":/res/icons/view-media-artist.svg"), - tr("Open artist: %1").arg(artistName)); + tr("Open artist: %1").arg(QString(artistName).replace(QLatin1Char('&'), QStringLiteral("&&")))); connect(openArtist, &QAction::triggered, this, [this, artistId] { m_backend->getArtist(artistId); }); @@ -216,7 +216,7 @@ void Tracks::onContextMenu(const QPoint &pos) for (const auto &pl : m_userPlaylists) { const qint64 plId = pl.first; const QString plName = pl.second; - auto *act = addToPlMenu->addAction(plName); + auto *act = addToPlMenu->addAction(QString(plName).replace(QLatin1Char('&'), QStringLiteral("&&"))); connect(act, &QAction::triggered, this, [this, id, plId] { emit addToPlaylistRequested(id, plId); }); diff --git a/src/mainwindow.cpp b/src/mainwindow.cpp index cfe260a..80eee57 100644 --- a/src/mainwindow.cpp +++ b/src/mainwindow.cpp @@ -40,8 +40,7 @@ MainWindow::MainWindow(QobuzBackend *backend, QWidget *parent) m_libraryDock->setObjectName(QStringLiteral("libraryDock")); m_libraryDock->setFeatures(QDockWidget::DockWidgetMovable); m_libraryDock->setWidget(m_library); - m_libraryDock->setMinimumWidth(180); - m_library->setFixedWidth(220); + m_libraryDock->setMinimumWidth(150); addDockWidget(Qt::LeftDockWidgetArea, m_libraryDock); // ---- Now-playing context dock (left, below library) ---- @@ -70,6 +69,10 @@ MainWindow::MainWindow(QobuzBackend *backend, QWidget *parent) connect(m_backend, &QobuzBackend::trackFinished, m_scrobbler, &LastFmScrobbler::onTrackFinished); + // Scrobble the finished track during a gapless transition + connect(m_backend, &QobuzBackend::trackTransitioned, + m_scrobbler, &LastFmScrobbler::onTrackFinished); + // ---- Backend signals ---- connect(m_backend, &QobuzBackend::loginSuccess, this, &MainWindow::onLoginSuccess); connect(m_backend, &QobuzBackend::loginError, this, &MainWindow::onLoginError); diff --git a/src/scrobbler/lastfm.hpp b/src/scrobbler/lastfm.hpp index 68bfa7e..fb9f319 100644 --- a/src/scrobbler/lastfm.hpp +++ b/src/scrobbler/lastfm.hpp @@ -78,9 +78,11 @@ public slots: m_artist = track["album"].toObject()["artist"].toObject()["name"].toString(); m_album = track["album"].toObject()["title"].toString(); m_duration = static_cast(track["duration"].toDouble()); - m_startTime = QDateTime::currentSecsSinceEpoch(); - m_playedSecs = 0; - m_scrobbled = false; + m_startTime = QDateTime::currentSecsSinceEpoch(); + m_playedSecs = 0; + m_lastPosition = 0; + m_accumulatedSecs = 0; + m_scrobbled = false; if (!isEnabled() || m_title.isEmpty() || m_duration < 30) return; updateNowPlaying(); @@ -88,18 +90,29 @@ public slots: void onPositionChanged(quint64 positionSecs, quint64 /*duration*/) { + // Accumulate actual listening time to prevent false scrobbles from seeking + if (positionSecs > m_lastPosition && (positionSecs - m_lastPosition) <= 2) { + m_accumulatedSecs += (positionSecs - m_lastPosition); + } + m_lastPosition = positionSecs; m_playedSecs = positionSecs; + if (!isEnabled() || m_scrobbled || m_title.isEmpty() || m_duration < 30) return; + // Scrobble after 50% or 240 seconds played, whichever comes first, min 30 seconds. const quint64 threshold = static_cast(qMin((qint64)240, m_duration / 2)); - if (positionSecs >= 30 && positionSecs >= threshold) + if (m_accumulatedSecs >= 30 && m_accumulatedSecs >= threshold) scrobble(); } void onTrackFinished() { if (!isEnabled() || m_scrobbled || m_title.isEmpty() || m_duration < 30) return; - if (m_playedSecs >= 30) scrobble(); + + const quint64 threshold = static_cast(qMin((qint64)240, m_duration / 2)); + if (m_accumulatedSecs >= 30 && m_accumulatedSecs >= threshold) { + scrobble(); + } } private: @@ -108,10 +121,12 @@ private: QString m_title; QString m_artist; QString m_album; - qint64 m_duration = 0; - qint64 m_startTime = 0; - quint64 m_playedSecs = 0; - bool m_scrobbled = false; + qint64 m_duration = 0; + qint64 m_startTime = 0; + quint64 m_playedSecs = 0; + quint64 m_lastPosition = 0; + quint64 m_accumulatedSecs = 0; + bool m_scrobbled = false; void updateNowPlaying() { @@ -134,16 +149,18 @@ private: { m_scrobbled = true; QMap params; - params["method"] = QStringLiteral("track.scrobble"); - params["api_key"] = AppSettings::instance().lastFmApiKey(); - params["sk"] = AppSettings::instance().lastFmSessionKey(); - params["artist"] = m_artist; - params["track"] = m_title; - params["album"] = m_album; - params["timestamp"] = QString::number(m_startTime); - params["duration"] = QString::number(m_duration); - params["api_sig"] = buildSig(params, AppSettings::instance().lastFmApiSecret()); - params["format"] = QStringLiteral("json"); + params["method"] = QStringLiteral("track.scrobble"); + params["api_key"] = AppSettings::instance().lastFmApiKey(); + params["sk"] = AppSettings::instance().lastFmSessionKey(); + + params["artist[0]"] = m_artist; + params["track[0]"] = m_title; + params["album[0]"] = m_album; + params["timestamp[0]"] = QString::number(m_startTime); + params["duration[0]"] = QString::number(m_duration); + + params["api_sig"] = buildSig(params, AppSettings::instance().lastFmApiSecret()); + params["format"] = QStringLiteral("json"); auto *reply = m_nam->post(apiRequest(), encodeBody(params)); connect(reply, &QNetworkReply::finished, reply, &QNetworkReply::deleteLater); diff --git a/src/view/maintoolbar.cpp b/src/view/maintoolbar.cpp index ed29290..b1ce3a1 100644 --- a/src/view/maintoolbar.cpp +++ b/src/view/maintoolbar.cpp @@ -96,9 +96,11 @@ MainToolBar::MainToolBar(QobuzBackend *backend, PlayQueue *queue, QWidget *paren // ---- Volume ---- m_volume = new VolumeButton(this); - m_volume->setValue(AppSettings::instance().volume()); addWidget(m_volume); connect(m_volume, &VolumeButton::volumeChanged, this, &MainToolBar::onVolumeChanged); + // Set volume after connecting so the backend receives the initial value + m_volume->setValue(AppSettings::instance().volume()); + m_backend->setVolume(AppSettings::instance().volume()); // ---- Queue toggle ---- m_queueBtn = addAction(Icon::queue(), tr("Queue")); diff --git a/src/view/sidepanel/view.cpp b/src/view/sidepanel/view.cpp index 26f98f1..a7e0100 100644 --- a/src/view/sidepanel/view.cpp +++ b/src/view/sidepanel/view.cpp @@ -186,13 +186,13 @@ void SearchTab::onTrackContextMenu(const QPoint &pos) menu.addSeparator(); if (!albumId.isEmpty()) { - auto *openAlbum = menu.addAction(tr("Go to album: %1").arg(albumTitle)); + auto *openAlbum = menu.addAction(tr("Go to album: %1").arg(QString(albumTitle).replace(QLatin1Char('&'), QStringLiteral("&&")))); connect(openAlbum, &QAction::triggered, this, [this, albumId] { emit albumSelected(albumId); }); } if (artistId > 0) { - auto *openArtist = menu.addAction(tr("Go to artist: %1").arg(artistName)); + auto *openArtist = menu.addAction(tr("Go to artist: %1").arg(QString(artistName).replace(QLatin1Char('&'), QStringLiteral("&&")))); connect(openArtist, &QAction::triggered, this, [this, artistId] { emit artistSelected(artistId); }); @@ -252,7 +252,7 @@ void SearchTab::onAlbumContextMenu(const QPoint &pos) const QString artistName = albumJson["artist"].toObject()["name"].toString(); if (artistId > 0) { menu.addSeparator(); - auto *openArtist = menu.addAction(tr("Go to artist: %1").arg(artistName)); + auto *openArtist = menu.addAction(tr("Go to artist: %1").arg(QString(artistName).replace(QLatin1Char('&'), QStringLiteral("&&")))); connect(openArtist, &QAction::triggered, this, [this, artistId] { emit artistSelected(artistId); });