Add mobile chunked streaming with segmented playback fallback

This commit is contained in:
joren
2026-03-31 22:14:15 +02:00
parent bb362686b4
commit 749b0c1aaf
8 changed files with 923 additions and 22 deletions

32
Cargo.lock generated
View File

@@ -196,6 +196,15 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "block-padding"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.20.2" version = "3.20.2"
@@ -220,6 +229,15 @@ version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]]
name = "cbc"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
dependencies = [
"cipher",
]
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.2.57" version = "1.2.57"
@@ -750,6 +768,15 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hkdf"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7"
dependencies = [
"hmac",
]
[[package]] [[package]]
name = "hmac" name = "hmac"
version = "0.12.1" version = "0.12.1"
@@ -1009,6 +1036,7 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01"
dependencies = [ dependencies = [
"block-padding",
"generic-array", "generic-array",
] ]
@@ -1578,15 +1606,19 @@ dependencies = [
name = "qobuzd" name = "qobuzd"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"aes",
"aes-gcm", "aes-gcm",
"anyhow", "anyhow",
"base64", "base64",
"cbc",
"chrono", "chrono",
"clap", "clap",
"cpal", "cpal",
"ctr",
"directories", "directories",
"futures-util", "futures-util",
"hex", "hex",
"hkdf",
"hmac", "hmac",
"keyring", "keyring",
"md-5", "md-5",

View File

@@ -36,6 +36,10 @@ futures-util = "0.3"
symphonia = { version = "0.5", features = ["flac", "pcm", "mp3", "aac", "ogg", "wav"] } symphonia = { version = "0.5", features = ["flac", "pcm", "mp3", "aac", "ogg", "wav"] }
cpal = "0.15" cpal = "0.15"
rubato = "0.15" rubato = "0.15"
aes = "0.8"
cbc = "0.1"
ctr = "0.9"
hkdf = "0.12"
[profile.release] [profile.release]
strip = true strip = true

View File

@@ -2,8 +2,92 @@ use crate::config::Config;
use crate::crypto; use crate::crypto;
use crate::error::{QobuzError, Result}; use crate::error::{QobuzError, Result};
use crate::types::*; use crate::types::*;
use aes::Aes128;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use cbc::cipher::{block_padding::NoPadding, BlockDecryptMut, KeyIvInit};
use hkdf::Hkdf;
use reqwest::Client; use reqwest::Client;
use sha2::Sha256;
use std::time::Duration; use std::time::Duration;
use tracing::warn;
type Aes128CbcDec = cbc::Decryptor<Aes128>;
#[derive(Debug, Clone)]
pub enum TrackStream {
DirectUrl {
url: String,
},
Segmented {
url_template: String,
n_segments: u32,
encryption_key_hex: Option<String>,
},
}
fn b64url_decode(s: &str) -> Result<Vec<u8>> {
URL_SAFE_NO_PAD
.decode(s.trim_end_matches('='))
.map_err(|e| QobuzError::CryptoError(format!("base64url decode failed: {}", e)))
}
fn derive_track_key_hex(
session_infos: &str,
app_secret_hex: &str,
key_field: &str,
) -> Result<String> {
let infos_parts: Vec<&str> = session_infos.splitn(2, '.').collect();
if infos_parts.len() != 2 {
return Err(QobuzError::CryptoError(format!(
"invalid session infos format: {}",
session_infos
)));
}
let salt = b64url_decode(infos_parts[0])?;
let info = b64url_decode(infos_parts[1])?;
let ikm = hex::decode(app_secret_hex)
.map_err(|e| QobuzError::CryptoError(format!("invalid app secret hex: {}", e)))?;
let hk = Hkdf::<Sha256>::new(Some(&salt), &ikm);
let mut kek = [0u8; 16];
hk.expand(&info, &mut kek)
.map_err(|e| QobuzError::CryptoError(format!("HKDF expand failed: {e:?}")))?;
let key_parts: Vec<&str> = key_field.splitn(3, '.').collect();
if key_parts.len() != 3 || key_parts[0] != "qbz-1" {
return Err(QobuzError::CryptoError(format!(
"unexpected key field format: {}",
key_field
)));
}
let ciphertext = b64url_decode(key_parts[1])?;
let iv_bytes = b64url_decode(key_parts[2])?;
if ciphertext.len() < 16 || iv_bytes.len() < 16 {
return Err(QobuzError::CryptoError(format!(
"invalid key field lengths: ciphertext={} iv={}",
ciphertext.len(),
iv_bytes.len()
)));
}
let mut buf = ciphertext;
let iv: [u8; 16] = iv_bytes[..16]
.try_into()
.map_err(|_| QobuzError::CryptoError("invalid IV length".to_string()))?;
let decrypted = Aes128CbcDec::new(&kek.into(), &iv.into())
.decrypt_padded_mut::<NoPadding>(&mut buf)
.map_err(|e| QobuzError::CryptoError(format!("AES-CBC decrypt failed: {e:?}")))?;
if decrypted.len() < 16 {
return Err(QobuzError::CryptoError(
"decrypted track key too short".to_string(),
));
}
Ok(hex::encode(&decrypted[..16]))
}
#[derive(Clone)] #[derive(Clone)]
pub struct QobuzApi { pub struct QobuzApi {
@@ -311,7 +395,110 @@ impl QobuzApi {
Ok(album) Ok(album)
} }
pub async fn get_track_url( async fn start_playback_session(&self, access_token: &str) -> Result<PlaybackSession> {
let timestamp = self.get_timestamp();
let signature =
crypto::generate_request_signature("session/start", &[("profile", "qbz-1")], timestamp);
let url = format!(
"{}/api.json/0.2/session/start?app_id={}&request_ts={}&request_sig={}",
self.base_url, self.app_id, timestamp, signature
);
let response = self
.client
.post(&url)
.headers(self.build_auth_headers(Some(access_token)))
.form(&[("profile", "qbz-1")])
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(QobuzError::ApiError(format!(
"Failed to start playback session: {} - {}",
status, body
)));
}
let session: PlaybackSession = response.json().await?;
Ok(session)
}
async fn get_track_stream_mobile(
&self,
access_token: &str,
track_id: &str,
format_id: u32,
) -> Result<TrackStream> {
let session = self.start_playback_session(access_token).await?;
let timestamp = self.get_timestamp();
let format_id_str = format_id.to_string();
let signature = crypto::generate_request_signature(
"file/url",
&[
("format_id", &format_id_str),
("intent", "stream"),
("track_id", track_id),
],
timestamp,
);
let url = format!(
"{}/api.json/0.2/file/url?app_id={}&track_id={}&format_id={}&intent=stream&request_ts={}&request_sig={}",
self.base_url, self.app_id, track_id, format_id, timestamp, signature
);
let mut headers = self.build_auth_headers(Some(access_token));
headers.insert(
"X-Session-Id",
session.session_id.parse().map_err(|e| {
QobuzError::ApiError(format!("Invalid X-Session-Id header value: {}", e))
})?,
);
let response = self.client.get(&url).headers(headers).send().await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(QobuzError::ApiError(format!(
"Failed to get mobile file URL: {} - {}",
status, body
)));
}
let mut file: FileUrlResponse = response.json().await?;
if let (Some(key_field), Some(infos)) = (file.key.clone(), session.infos.as_deref()) {
match derive_track_key_hex(infos, &crypto::APP_SECRET, &key_field) {
Ok(unwrapped) => file.key = Some(unwrapped),
Err(e) => {
warn!("Failed to unwrap track key for {}: {}", track_id, e);
file.key = None;
}
}
}
if let Some(url) = file.url {
return Ok(TrackStream::DirectUrl { url });
}
if let (Some(url_template), Some(n_segments)) = (file.url_template, file.n_segments) {
return Ok(TrackStream::Segmented {
url_template,
n_segments,
encryption_key_hex: file.key,
});
}
Err(QobuzError::ApiError(
"Mobile file/url response did not contain url or url_template".to_string(),
))
}
async fn get_track_url_legacy(
&self, &self,
access_token: &str, access_token: &str,
track_id: &str, track_id: &str,
@@ -360,6 +547,47 @@ impl QobuzApi {
Ok(url_response.url) Ok(url_response.url)
} }
pub async fn get_track_stream(
&self,
access_token: &str,
track_id: &str,
format_id: u32,
) -> Result<TrackStream> {
match self
.get_track_stream_mobile(access_token, track_id, format_id)
.await
{
Ok(stream) => Ok(stream),
Err(e) => {
warn!(
"Mobile file/url failed for track {} (format {}), falling back to legacy endpoint: {}",
track_id, format_id, e
);
let url = self
.get_track_url_legacy(access_token, track_id, format_id)
.await?;
Ok(TrackStream::DirectUrl { url })
}
}
}
pub async fn get_track_url(
&self,
access_token: &str,
track_id: &str,
format_id: u32,
) -> Result<String> {
match self
.get_track_stream(access_token, track_id, format_id)
.await?
{
TrackStream::DirectUrl { url } => Ok(url),
TrackStream::Segmented { .. } => Err(QobuzError::ApiError(
"Track uses segmented stream; use get_track_stream instead".to_string(),
)),
}
}
pub async fn get_track(&self, access_token: &str, track_id: &str) -> Result<Track> { pub async fn get_track(&self, access_token: &str, track_id: &str) -> Result<Track> {
let timestamp = self.get_timestamp(); let timestamp = self.get_timestamp();

View File

@@ -1,7 +1,7 @@
use md5::{Digest, Md5}; use md5::{Digest, Md5};
use sha1::Sha1; use sha1::Sha1;
const APP_SECRET: &str = "e79f8b9be485692b0e5f9dd895826368"; pub const APP_SECRET: &str = "e79f8b9be485692b0e5f9dd895826368";
pub fn md5_hash(input: &str) -> String { pub fn md5_hash(input: &str) -> String {
let mut hasher = Md5::new(); let mut hasher = Md5::new();

View File

@@ -5,7 +5,7 @@ use tokio::sync::Mutex;
use tracing::{error, info, Level}; use tracing::{error, info, Level};
use tracing_subscriber::FmtSubscriber; use tracing_subscriber::FmtSubscriber;
use qobuzd::api::QobuzApi; use qobuzd::api::{QobuzApi, TrackStream};
use qobuzd::auth::QobuzAuth; use qobuzd::auth::QobuzAuth;
use qobuzd::config::Config; use qobuzd::config::Config;
use qobuzd::qconnect::QConnect; use qobuzd::qconnect::QConnect;
@@ -178,8 +178,24 @@ async fn main() -> Result<()> {
let token = guard.get_valid_token().await?; let token = guard.get_valid_token().await?;
drop(guard); drop(guard);
let api = QobuzApi::new(&config); let api = QobuzApi::new(&config);
match api.get_track_url(&token, &track_id, format_id).await { match api.get_track_stream(&token, &track_id, format_id).await {
Ok(url) => println!("Stream URL: {}", url), Ok(TrackStream::DirectUrl { url }) => println!("Stream URL: {}", url),
Ok(TrackStream::Segmented {
url_template,
n_segments,
encryption_key_hex,
}) => {
println!("Segmented stream template: {}", url_template);
println!("Segments: {}", n_segments);
println!(
"Encrypted: {}",
if encryption_key_hex.is_some() {
"yes"
} else {
"no"
}
);
}
Err(e) => { Err(e) => {
error!("Failed: {}", e); error!("Failed: {}", e);
std::process::exit(1); std::process::exit(1);

View File

@@ -1,8 +1,12 @@
use std::collections::VecDeque;
use std::io::{self, Read, Seek, SeekFrom}; use std::io::{self, Read, Seek, SeekFrom};
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering};
use std::sync::Arc; use std::sync::Arc;
use aes::Aes128;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use ctr::cipher::{KeyIvInit, StreamCipher};
use ctr::Ctr128BE;
use symphonia::core::audio::SampleBuffer; use symphonia::core::audio::SampleBuffer;
use symphonia::core::codecs::DecoderOptions; use symphonia::core::codecs::DecoderOptions;
use symphonia::core::formats::{FormatOptions, SeekMode, SeekTo}; use symphonia::core::formats::{FormatOptions, SeekMode, SeekTo};
@@ -16,7 +20,7 @@ use tracing::{error, info, warn};
#[derive(Debug)] #[derive(Debug)]
pub enum PlayerCommand { pub enum PlayerCommand {
Play { Play {
url: String, stream: StreamSource,
track_id: i32, track_id: i32,
queue_item_id: i32, queue_item_id: i32,
duration_ms: u64, duration_ms: u64,
@@ -29,6 +33,16 @@ pub enum PlayerCommand {
SetVolume(u8), SetVolume(u8),
} }
#[derive(Debug, Clone)]
pub enum StreamSource {
DirectUrl(String),
Segmented {
url_template: String,
n_segments: u32,
encryption_key_hex: Option<String>,
},
}
#[derive(Debug, Clone, Copy, PartialEq)] #[derive(Debug, Clone, Copy, PartialEq)]
pub enum PlayerState { pub enum PlayerState {
Stopped, Stopped,
@@ -64,7 +78,7 @@ pub struct AudioPlayer {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct PlaybackRequest { struct PlaybackRequest {
url: String, stream: StreamSource,
track_id: i32, track_id: i32,
queue_item_id: i32, queue_item_id: i32,
duration_ms: u64, duration_ms: u64,
@@ -94,10 +108,10 @@ fn start_playback(shared: &Arc<SharedState>, req: &PlaybackRequest) {
shared.playing.store(true, Ordering::SeqCst); shared.playing.store(true, Ordering::SeqCst);
let shared_play = shared.clone(); let shared_play = shared.clone();
let url = req.url.clone(); let stream = req.stream.clone();
let start_position_ms = req.start_position_ms; let start_position_ms = req.start_position_ms;
std::thread::spawn(move || { std::thread::spawn(move || {
if let Err(e) = play_stream(&url, shared_play, generation, start_position_ms) { if let Err(e) = play_stream(&stream, shared_play, generation, start_position_ms) {
error!("Playback error: {}", e); error!("Playback error: {}", e);
} }
}); });
@@ -182,14 +196,14 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
match cmd { match cmd {
PlayerCommand::Play { PlayerCommand::Play {
url, stream,
track_id, track_id,
queue_item_id, queue_item_id,
duration_ms, duration_ms,
start_position_ms, start_position_ms,
} => { } => {
let req = PlaybackRequest { let req = PlaybackRequest {
url, stream,
track_id, track_id,
queue_item_id, queue_item_id,
duration_ms, duration_ms,
@@ -411,7 +425,257 @@ impl MediaSource for HttpStreamSource {
// Streaming playback // Streaming playback
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const QBZ1_UUID: [u8; 16] = [
0x3b, 0x42, 0x12, 0x92, 0x56, 0xf3, 0x5f, 0x75, 0x92, 0x36, 0x63, 0xb6, 0x9a, 0x1f, 0x52, 0xb2,
];
fn read_u32_be(data: &[u8], offset: usize) -> u32 {
u32::from_be_bytes(data[offset..offset + 4].try_into().unwrap())
}
fn read_u24_be(data: &[u8], offset: usize) -> u32 {
((data[offset] as u32) << 16) | ((data[offset + 1] as u32) << 8) | (data[offset + 2] as u32)
}
fn find_dfla_blocks(data: &[u8]) -> Option<Vec<u8>> {
let mut pos = 0usize;
while pos + 8 <= data.len() {
let size = match data[pos..pos + 4].try_into().ok().map(u32::from_be_bytes) {
Some(s) if s >= 8 && pos + s as usize <= data.len() => s as usize,
_ => break,
};
let box_type = &data[pos + 4..pos + 8];
if box_type == b"dfLa" {
let body = &data[pos + 8..pos + size];
if body.len() > 4 {
return Some(body[4..].to_vec());
}
}
let inner = match box_type {
b"moov" | b"trak" | b"mdia" | b"minf" | b"stbl" => Some(&data[pos + 8..pos + size]),
b"stsd" => data.get(pos + 16..pos + size),
b"fLaC" => data.get(pos + 36..pos + size),
_ => None,
};
if let Some(inner_data) = inner {
if let Some(found) = find_dfla_blocks(inner_data) {
return Some(found);
}
}
pos += size;
}
None
}
fn extract_flac_header(init_data: &[u8]) -> Option<Vec<u8>> {
let blocks = find_dfla_blocks(init_data)?;
let mut out = Vec::with_capacity(4 + blocks.len());
out.extend_from_slice(b"fLaC");
out.extend_from_slice(&blocks);
Some(out)
}
fn parse_track_key(encryption_key_hex: Option<&str>) -> Option<[u8; 16]> {
let hex_str = encryption_key_hex?;
let bytes = hex::decode(hex_str).ok()?;
if bytes.len() != 16 {
return None;
}
bytes.try_into().ok()
}
fn decrypt_and_extract_frames(data: &mut [u8], key: Option<&[u8; 16]>) -> Vec<u8> {
let mut frames = Vec::new();
let mut pos = 0usize;
while pos + 8 <= data.len() {
let box_size = read_u32_be(data, pos) as usize;
if box_size < 8 || pos + box_size > data.len() {
break;
}
if data[pos + 4..pos + 8] == *b"uuid"
&& box_size >= 36
&& data[pos + 8..pos + 24] == QBZ1_UUID
{
let body = pos + 24;
if body + 12 > data.len() {
pos += box_size;
continue;
}
let raw_offset = read_u32_be(data, body + 4) as usize;
let num_samples = read_u24_be(data, body + 9) as usize;
let sample_data_start = pos + raw_offset;
let table_start = body + 12;
let mut offset = sample_data_start;
for i in 0..num_samples {
let entry = table_start + i * 16;
if entry + 16 > data.len() {
break;
}
let size = read_u32_be(data, entry) as usize;
let encrypted = u16::from_be_bytes([data[entry + 6], data[entry + 7]]) != 0;
let end = offset + size;
if end <= data.len() {
if encrypted {
if let Some(track_key) = key {
let mut iv = [0u8; 16];
iv[..8].copy_from_slice(&data[entry + 8..entry + 16]);
Ctr128BE::<Aes128>::new(track_key.into(), (&iv).into())
.apply_keystream(&mut data[offset..end]);
}
}
frames.extend_from_slice(&data[offset..end]);
}
offset += size;
}
}
pos += box_size;
}
frames
}
struct SegmentedStreamSource {
client: reqwest::blocking::Client,
url_template: String,
n_segments: usize,
next_segment: usize,
include_segment_one: bool,
key: Option<[u8; 16]>,
pending: VecDeque<u8>,
finished: bool,
}
impl SegmentedStreamSource {
fn fetch_segment(&self, segment: usize) -> io::Result<Vec<u8>> {
let url = self.url_template.replace("$SEGMENT$", &segment.to_string());
for attempt in 0..3 {
if attempt > 0 {
std::thread::sleep(std::time::Duration::from_millis(300 * attempt as u64));
}
match self.client.get(&url).send() {
Ok(mut response) if response.status().is_success() => {
let mut data =
Vec::with_capacity(response.content_length().unwrap_or(0) as usize);
response
.copy_to(&mut data)
.map_err(|e| io::Error::other(format!("segment read failed: {}", e)))?;
return Ok(data);
}
Ok(response) => {
if attempt == 2 {
return Err(io::Error::other(format!(
"segment HTTP {} for {}",
response.status(),
url
)));
}
}
Err(e) => {
if attempt == 2 {
return Err(io::Error::other(format!("segment fetch failed: {}", e)));
}
}
}
}
Err(io::Error::other("segment fetch retries exhausted"))
}
fn fill_pending(&mut self) -> io::Result<()> {
if self.finished || !self.pending.is_empty() {
return Ok(());
}
let segment_to_fetch = if self.include_segment_one {
self.include_segment_one = false;
1usize
} else {
if self.next_segment > self.n_segments {
self.finished = true;
return Ok(());
}
let seg = self.next_segment;
self.next_segment += 1;
seg
};
let mut data = self.fetch_segment(segment_to_fetch)?;
let frames = decrypt_and_extract_frames(&mut data, self.key.as_ref());
if frames.is_empty() {
self.pending.extend(data);
} else {
self.pending.extend(frames);
}
Ok(())
}
}
impl Read for SegmentedStreamSource {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
self.fill_pending()?;
if self.pending.is_empty() {
return Ok(0);
}
let n = out.len().min(self.pending.len());
for slot in out.iter_mut().take(n) {
*slot = self.pending.pop_front().unwrap_or(0);
}
Ok(n)
}
}
impl Seek for SegmentedStreamSource {
fn seek(&mut self, _from: SeekFrom) -> io::Result<u64> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"segmented source is not seekable",
))
}
}
impl MediaSource for SegmentedStreamSource {
fn is_seekable(&self) -> bool {
false
}
fn byte_len(&self) -> Option<u64> {
None
}
}
fn play_stream( fn play_stream(
stream: &StreamSource,
shared: Arc<SharedState>,
generation: u64,
start_position_ms: u64,
) -> anyhow::Result<()> {
match stream {
StreamSource::DirectUrl(url) => {
play_direct_stream(url, shared, generation, start_position_ms)
}
StreamSource::Segmented {
url_template,
n_segments,
encryption_key_hex,
} => play_segmented_stream(
url_template,
*n_segments,
encryption_key_hex.as_deref(),
shared,
generation,
start_position_ms,
),
}
}
fn play_direct_stream(
url: &str, url: &str,
shared: Arc<SharedState>, shared: Arc<SharedState>,
generation: u64, generation: u64,
@@ -679,3 +943,290 @@ fn play_stream(
} }
Ok(()) Ok(())
} }
fn play_segmented_stream(
url_template: &str,
n_segments: u32,
encryption_key_hex: Option<&str>,
shared: Arc<SharedState>,
generation: u64,
start_position_ms: u64,
) -> anyhow::Result<()> {
if n_segments == 0 {
return Err(anyhow::anyhow!("segmented stream has zero segments"));
}
info!(
"Streaming segmented audio (segments={}, start={}ms)",
n_segments, start_position_ms
);
let client = reqwest::blocking::Client::new();
let segment_duration_ms = 10_000u64;
let mut start_segment = ((start_position_ms / segment_duration_ms) + 1) as usize;
start_segment = start_segment.clamp(1, n_segments as usize);
let segment_start_ms = (start_segment as u64 - 1) * segment_duration_ms;
let mut skip_within_segment_ms = start_position_ms.saturating_sub(segment_start_ms);
let mut pending = VecDeque::new();
let init_url = url_template.replace("$SEGMENT$", "0");
let mut is_flac = false;
if let Ok(response) = client.get(&init_url).send() {
if response.status().is_success() {
if let Ok(init_bytes) = response.bytes() {
if let Some(header) = extract_flac_header(&init_bytes) {
pending.extend(header);
is_flac = true;
}
}
}
}
let key = parse_track_key(encryption_key_hex);
let include_segment_one = key.is_none() && start_segment > 1;
if include_segment_one {
skip_within_segment_ms = skip_within_segment_ms.saturating_add(segment_duration_ms);
}
let source = SegmentedStreamSource {
client,
url_template: url_template.to_string(),
n_segments: n_segments as usize,
next_segment: start_segment,
include_segment_one,
key,
pending,
finished: false,
};
let mss = MediaSourceStream::new(Box::new(source), Default::default());
let mut hint = Hint::new();
if is_flac {
hint.with_extension("flac");
}
let probed = symphonia::default::get_probe().format(
&hint,
mss,
&FormatOptions::default(),
&MetadataOptions::default(),
)?;
let mut format = probed.format;
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL)
.ok_or_else(|| anyhow::anyhow!("no audio track"))?
.clone();
let track_id = track.id;
let sample_rate = track.codec_params.sample_rate.unwrap_or(44100);
let channels = track.codec_params.channels.map(|c| c.count()).unwrap_or(2);
let mut decoder =
symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?;
let host = cpal::default_host();
let device = host
.default_output_device()
.ok_or_else(|| anyhow::anyhow!("no audio output device"))?;
info!("Audio output: {}", device.name().unwrap_or_default());
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(4);
let config = cpal::StreamConfig {
channels: channels as u16,
sample_rate: cpal::SampleRate(sample_rate),
buffer_size: cpal::BufferSize::Default,
};
let shared_out = shared.clone();
let played_frames = Arc::new(AtomicU64::new(0));
let queued_frames = Arc::new(AtomicU64::new(0));
let played_frames_out = played_frames.clone();
let queued_frames_out = queued_frames.clone();
let base_position_ms_out = start_position_ms;
let sample_rate_u64 = sample_rate as u64;
let channel_count = channels;
let mut ring_buf: Vec<f32> = Vec::new();
let mut ring_pos = 0;
let stream = device.build_output_stream(
&config,
move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0;
let paused = shared_out.paused.load(Ordering::Relaxed);
let mut frames_consumed = 0u64;
for frame in out.chunks_mut(channel_count) {
if paused {
frame.fill(0.0);
continue;
}
if ring_pos >= ring_buf.len() {
match sample_rx.try_recv() {
Ok(buf) => {
ring_buf = buf;
ring_pos = 0;
}
Err(_) => {
frame.fill(0.0);
continue;
}
}
}
if ring_pos + channel_count <= ring_buf.len() {
for sample in frame.iter_mut() {
*sample = ring_buf[ring_pos] * vol;
ring_pos += 1;
}
frames_consumed += 1;
} else {
frame.fill(0.0);
}
}
if frames_consumed > 0 {
let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed)
+ frames_consumed;
let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1);
let pos_ms = base_position_ms_out.saturating_add(played_ms);
shared_out.position_ms.store(pos_ms, Ordering::Relaxed);
atomic_saturating_sub_u64(&queued_frames_out, frames_consumed);
}
},
|err| error!("cpal error: {}", err),
None,
)?;
stream.play()?;
info!("Playback started ({}Hz, {}ch)", sample_rate, channels);
let mut finished_naturally = false;
let mut samples_to_skip = ((skip_within_segment_ms as u128)
.saturating_mul(sample_rate as u128)
.saturating_mul(channels as u128)
/ 1000) as u64;
loop {
if shared.generation.load(Ordering::SeqCst) != generation {
info!("Playback superseded by newer generation");
break;
}
if shared.stop_signal.load(Ordering::Relaxed) {
info!("Playback stopped by signal");
break;
}
while shared.paused.load(Ordering::Relaxed) {
std::thread::sleep(std::time::Duration::from_millis(50));
if shared.stop_signal.load(Ordering::Relaxed)
|| shared.generation.load(Ordering::SeqCst) != generation
{
break;
}
}
if shared.stop_signal.load(Ordering::Relaxed)
|| shared.generation.load(Ordering::SeqCst) != generation
{
break;
}
let packet = match format.next_packet() {
Ok(p) => p,
Err(symphonia::core::errors::Error::IoError(ref e))
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
{
info!("Playback finished (gen={})", generation);
finished_naturally = true;
break;
}
Err(symphonia::core::errors::Error::ResetRequired) => {
decoder.reset();
continue;
}
Err(e) => {
warn!("Packet error: {}", e);
break;
}
};
if packet.track_id() != track_id {
continue;
}
let decoded = match decoder.decode(&packet) {
Ok(d) => d,
Err(symphonia::core::errors::Error::DecodeError(e)) => {
warn!("Decode error: {}", e);
continue;
}
Err(e) => {
warn!("Decode error: {}", e);
break;
}
};
let spec = *decoded.spec();
let n_frames = decoded.frames();
let mut sample_buf = SampleBuffer::<f32>::new(n_frames as u64, spec);
sample_buf.copy_interleaved_ref(decoded);
let samples = sample_buf.samples();
if samples.is_empty() {
continue;
}
let start_offset = if samples_to_skip == 0 {
0usize
} else if samples_to_skip >= samples.len() as u64 {
samples_to_skip -= samples.len() as u64;
continue;
} else {
let off = samples_to_skip as usize;
samples_to_skip = 0;
off
};
let samples_vec = samples[start_offset..].to_vec();
let frame_count = (samples_vec.len() / channels) as u64;
if frame_count == 0 {
continue;
}
queued_frames.fetch_add(frame_count, Ordering::Relaxed);
if sample_tx.send(samples_vec).is_err() {
atomic_saturating_sub_u64(&queued_frames, frame_count);
break;
}
}
if finished_naturally {
let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12);
while queued_frames.load(Ordering::Relaxed) > 0 {
if shared.generation.load(Ordering::SeqCst) != generation {
break;
}
if shared.stop_signal.load(Ordering::Relaxed) {
break;
}
if std::time::Instant::now() >= drain_deadline {
warn!(
"Playback drain timeout with {} queued frames",
queued_frames.load(Ordering::Relaxed)
);
break;
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
} else {
std::thread::sleep(std::time::Duration::from_millis(100));
}
drop(stream);
if shared.generation.load(Ordering::SeqCst) == generation {
shared.playing.store(false, Ordering::SeqCst);
shared.paused.store(false, Ordering::SeqCst);
}
Ok(())
}

View File

@@ -6,9 +6,9 @@ use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use crate::api::QobuzApi; use crate::api::{QobuzApi, TrackStream};
use crate::config::Config; use crate::config::Config;
use crate::player::{AudioPlayer, PlayerCommand, PlayerState}; use crate::player::{AudioPlayer, PlayerCommand, PlayerState, StreamSource};
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Protobuf helpers (hand-rolled, matching the qconnect.proto schema) // Protobuf helpers (hand-rolled, matching the qconnect.proto schema)
@@ -503,8 +503,25 @@ fn parse_incoming_commands(data: &[u8]) -> Vec<QConnectCommand> {
get_varint_field(&qvf, 1).unwrap_or(0) as u32 get_varint_field(&qvf, 1).unwrap_or(0) as u32
}) })
.unwrap_or(0); .unwrap_or(0);
let current_track = get_bytes_field(&fields, 4).map(parse_queue_track); let current_track = get_bytes_field(&fields, 4)
let next_track = get_bytes_field(&fields, 5).map(parse_queue_track); .map(parse_queue_track)
.and_then(|t| {
if t.track_id <= 0 || t.queue_item_id < 0 {
None
} else {
Some(t)
}
});
let next_track =
get_bytes_field(&fields, 5)
.map(parse_queue_track)
.and_then(|t| {
if t.track_id <= 0 || t.queue_item_id < 0 {
None
} else {
Some(t)
}
});
info!("[RECV] SET_STATE: playing_state={:?}, position_ms={:?}, current_track={:?}, next_track={:?}, queue_ver={}", info!("[RECV] SET_STATE: playing_state={:?}, position_ms={:?}, current_track={:?}, next_track={:?}, queue_ver={}",
playing_state, position_ms, current_track, next_track, queue_version_major); playing_state, position_ms, current_track, next_track, queue_version_major);
@@ -1026,11 +1043,31 @@ async fn run_connection(
Err(e) => { warn!("get_track failed: {}", e); 0 } Err(e) => { warn!("get_track failed: {}", e); 0 }
}; };
current_duration_ms = duration_ms; current_duration_ms = duration_ms;
match api.get_track_url(auth_token, &track_id_str, format_id).await { match api.get_track_stream(auth_token, &track_id_str, format_id).await {
Ok(url) => { Ok(stream) => {
info!("[STATE] Got URL, playing (duration={}ms)", duration_ms); let player_stream = match stream {
TrackStream::DirectUrl { url } => {
info!("[STATE] Got direct stream URL (duration={}ms)", duration_ms);
StreamSource::DirectUrl(url)
}
TrackStream::Segmented {
url_template,
n_segments,
encryption_key_hex,
} => {
info!(
"[STATE] Got segmented stream (segments={}, duration={}ms)",
n_segments, duration_ms
);
StreamSource::Segmented {
url_template,
n_segments,
encryption_key_hex,
}
}
};
player.send(PlayerCommand::Play { player.send(PlayerCommand::Play {
url, stream: player_stream,
track_id: track.track_id, track_id: track.track_id,
queue_item_id: track.queue_item_id, queue_item_id: track.queue_item_id,
duration_ms, duration_ms,
@@ -1216,10 +1253,22 @@ async fn run_connection(
Err(e) => { warn!("get_track failed: {}", e); current_duration_ms } Err(e) => { warn!("get_track failed: {}", e); current_duration_ms }
}; };
current_duration_ms = duration_ms; current_duration_ms = duration_ms;
match api.get_track_url(auth_token, &track_id_str, format_id).await { match api.get_track_stream(auth_token, &track_id_str, format_id).await {
Ok(url) => { Ok(stream) => {
let player_stream = match stream {
TrackStream::DirectUrl { url } => StreamSource::DirectUrl(url),
TrackStream::Segmented {
url_template,
n_segments,
encryption_key_hex,
} => StreamSource::Segmented {
url_template,
n_segments,
encryption_key_hex,
},
};
player.send(PlayerCommand::Play { player.send(PlayerCommand::Play {
url, stream: player_stream,
track_id: current_track_id, track_id: current_track_id,
queue_item_id: current_queue_item_id, queue_item_id: current_queue_item_id,
duration_ms, duration_ms,

View File

@@ -251,6 +251,27 @@ pub struct Track {
pub rights: Option<Rights>, pub rights: Option<Rights>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlaybackSession {
pub session_id: String,
pub expires_at: Option<u64>,
pub infos: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileUrlResponse {
pub track_id: Option<i64>,
pub duration: Option<f64>,
pub url: Option<String>,
pub url_template: Option<String>,
pub n_segments: Option<u32>,
pub format_id: Option<i32>,
pub mime_type: Option<String>,
pub sampling_rate: Option<f64>,
pub bit_depth: Option<i32>,
pub key: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Playlist { pub struct Playlist {
pub id: u64, pub id: u64,