Compare commits
10 Commits
fix/seek-p
...
feat/persi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
778f5fc69e | ||
|
|
a26db5cf96 | ||
|
|
20d5ecf231 | ||
|
|
c3cad15719 | ||
|
|
7b882a727a | ||
|
|
bacb40af58 | ||
|
|
4c19691b75 | ||
|
|
749b0c1aaf | ||
|
|
bb362686b4 | ||
|
|
6296acc6dd |
32
Cargo.lock
generated
32
Cargo.lock
generated
@@ -196,6 +196,15 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "block-padding"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
|
||||
dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bumpalo"
|
||||
version = "3.20.2"
|
||||
@@ -220,6 +229,15 @@ version = "1.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
|
||||
|
||||
[[package]]
|
||||
name = "cbc"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
|
||||
dependencies = [
|
||||
"cipher",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.57"
|
||||
@@ -750,6 +768,15 @@ version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
|
||||
|
||||
[[package]]
|
||||
name = "hkdf"
|
||||
version = "0.12.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7"
|
||||
dependencies = [
|
||||
"hmac",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hmac"
|
||||
version = "0.12.1"
|
||||
@@ -1009,6 +1036,7 @@ version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01"
|
||||
dependencies = [
|
||||
"block-padding",
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
@@ -1578,15 +1606,19 @@ dependencies = [
|
||||
name = "qobuzd"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"aes",
|
||||
"aes-gcm",
|
||||
"anyhow",
|
||||
"base64",
|
||||
"cbc",
|
||||
"chrono",
|
||||
"clap",
|
||||
"cpal",
|
||||
"ctr",
|
||||
"directories",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"hkdf",
|
||||
"hmac",
|
||||
"keyring",
|
||||
"md-5",
|
||||
|
||||
@@ -36,6 +36,10 @@ futures-util = "0.3"
|
||||
symphonia = { version = "0.5", features = ["flac", "pcm", "mp3", "aac", "ogg", "wav"] }
|
||||
cpal = "0.15"
|
||||
rubato = "0.15"
|
||||
aes = "0.8"
|
||||
cbc = "0.1"
|
||||
ctr = "0.9"
|
||||
hkdf = "0.12"
|
||||
|
||||
[profile.release]
|
||||
strip = true
|
||||
|
||||
249
src/api.rs
249
src/api.rs
@@ -2,8 +2,98 @@ use crate::config::Config;
|
||||
use crate::crypto;
|
||||
use crate::error::{QobuzError, Result};
|
||||
use crate::types::*;
|
||||
use aes::Aes128;
|
||||
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
|
||||
use cbc::cipher::{block_padding::NoPadding, BlockDecryptMut, KeyIvInit};
|
||||
use hkdf::Hkdf;
|
||||
use reqwest::Client;
|
||||
use sha2::Sha256;
|
||||
use std::time::Duration;
|
||||
use tracing::warn;
|
||||
|
||||
type Aes128CbcDec = cbc::Decryptor<Aes128>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum TrackStream {
|
||||
DirectUrl {
|
||||
url: String,
|
||||
sampling_rate_hz: Option<u32>,
|
||||
bit_depth: Option<u32>,
|
||||
channels: Option<u32>,
|
||||
},
|
||||
Segmented {
|
||||
url_template: String,
|
||||
n_segments: u32,
|
||||
encryption_key_hex: Option<String>,
|
||||
sampling_rate_hz: Option<u32>,
|
||||
bit_depth: Option<u32>,
|
||||
channels: Option<u32>,
|
||||
},
|
||||
}
|
||||
|
||||
fn b64url_decode(s: &str) -> Result<Vec<u8>> {
|
||||
URL_SAFE_NO_PAD
|
||||
.decode(s.trim_end_matches('='))
|
||||
.map_err(|e| QobuzError::CryptoError(format!("base64url decode failed: {}", e)))
|
||||
}
|
||||
|
||||
fn derive_track_key_hex(
|
||||
session_infos: &str,
|
||||
app_secret_hex: &str,
|
||||
key_field: &str,
|
||||
) -> Result<String> {
|
||||
let infos_parts: Vec<&str> = session_infos.splitn(2, '.').collect();
|
||||
if infos_parts.len() != 2 {
|
||||
return Err(QobuzError::CryptoError(format!(
|
||||
"invalid session infos format: {}",
|
||||
session_infos
|
||||
)));
|
||||
}
|
||||
|
||||
let salt = b64url_decode(infos_parts[0])?;
|
||||
let info = b64url_decode(infos_parts[1])?;
|
||||
let ikm = hex::decode(app_secret_hex)
|
||||
.map_err(|e| QobuzError::CryptoError(format!("invalid app secret hex: {}", e)))?;
|
||||
|
||||
let hk = Hkdf::<Sha256>::new(Some(&salt), &ikm);
|
||||
let mut kek = [0u8; 16];
|
||||
hk.expand(&info, &mut kek)
|
||||
.map_err(|e| QobuzError::CryptoError(format!("HKDF expand failed: {e:?}")))?;
|
||||
|
||||
let key_parts: Vec<&str> = key_field.splitn(3, '.').collect();
|
||||
if key_parts.len() != 3 || key_parts[0] != "qbz-1" {
|
||||
return Err(QobuzError::CryptoError(format!(
|
||||
"unexpected key field format: {}",
|
||||
key_field
|
||||
)));
|
||||
}
|
||||
|
||||
let ciphertext = b64url_decode(key_parts[1])?;
|
||||
let iv_bytes = b64url_decode(key_parts[2])?;
|
||||
if ciphertext.len() < 16 || iv_bytes.len() < 16 {
|
||||
return Err(QobuzError::CryptoError(format!(
|
||||
"invalid key field lengths: ciphertext={} iv={}",
|
||||
ciphertext.len(),
|
||||
iv_bytes.len()
|
||||
)));
|
||||
}
|
||||
|
||||
let mut buf = ciphertext;
|
||||
let iv: [u8; 16] = iv_bytes[..16]
|
||||
.try_into()
|
||||
.map_err(|_| QobuzError::CryptoError("invalid IV length".to_string()))?;
|
||||
let decrypted = Aes128CbcDec::new(&kek.into(), &iv.into())
|
||||
.decrypt_padded_mut::<NoPadding>(&mut buf)
|
||||
.map_err(|e| QobuzError::CryptoError(format!("AES-CBC decrypt failed: {e:?}")))?;
|
||||
|
||||
if decrypted.len() < 16 {
|
||||
return Err(QobuzError::CryptoError(
|
||||
"decrypted track key too short".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(hex::encode(&decrypted[..16]))
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct QobuzApi {
|
||||
@@ -311,7 +401,118 @@ impl QobuzApi {
|
||||
Ok(album)
|
||||
}
|
||||
|
||||
pub async fn get_track_url(
|
||||
async fn start_playback_session(&self, access_token: &str) -> Result<PlaybackSession> {
|
||||
let timestamp = self.get_timestamp();
|
||||
let signature =
|
||||
crypto::generate_request_signature("session/start", &[("profile", "qbz-1")], timestamp);
|
||||
|
||||
let url = format!(
|
||||
"{}/api.json/0.2/session/start?app_id={}&request_ts={}&request_sig={}",
|
||||
self.base_url, self.app_id, timestamp, signature
|
||||
);
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.post(&url)
|
||||
.headers(self.build_auth_headers(Some(access_token)))
|
||||
.form(&[("profile", "qbz-1")])
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
return Err(QobuzError::ApiError(format!(
|
||||
"Failed to start playback session: {} - {}",
|
||||
status, body
|
||||
)));
|
||||
}
|
||||
|
||||
let session: PlaybackSession = response.json().await?;
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
async fn get_track_stream_mobile(
|
||||
&self,
|
||||
access_token: &str,
|
||||
track_id: &str,
|
||||
format_id: u32,
|
||||
) -> Result<TrackStream> {
|
||||
let session = self.start_playback_session(access_token).await?;
|
||||
|
||||
let timestamp = self.get_timestamp();
|
||||
let format_id_str = format_id.to_string();
|
||||
let signature = crypto::generate_request_signature(
|
||||
"file/url",
|
||||
&[
|
||||
("format_id", &format_id_str),
|
||||
("intent", "stream"),
|
||||
("track_id", track_id),
|
||||
],
|
||||
timestamp,
|
||||
);
|
||||
|
||||
let url = format!(
|
||||
"{}/api.json/0.2/file/url?app_id={}&track_id={}&format_id={}&intent=stream&request_ts={}&request_sig={}",
|
||||
self.base_url, self.app_id, track_id, format_id, timestamp, signature
|
||||
);
|
||||
|
||||
let mut headers = self.build_auth_headers(Some(access_token));
|
||||
headers.insert(
|
||||
"X-Session-Id",
|
||||
session.session_id.parse().map_err(|e| {
|
||||
QobuzError::ApiError(format!("Invalid X-Session-Id header value: {}", e))
|
||||
})?,
|
||||
);
|
||||
|
||||
let response = self.client.get(&url).headers(headers).send().await?;
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
return Err(QobuzError::ApiError(format!(
|
||||
"Failed to get mobile file URL: {} - {}",
|
||||
status, body
|
||||
)));
|
||||
}
|
||||
|
||||
let mut file: FileUrlResponse = response.json().await?;
|
||||
|
||||
if let (Some(key_field), Some(infos)) = (file.key.clone(), session.infos.as_deref()) {
|
||||
match derive_track_key_hex(infos, &crypto::APP_SECRET, &key_field) {
|
||||
Ok(unwrapped) => file.key = Some(unwrapped),
|
||||
Err(e) => {
|
||||
warn!("Failed to unwrap track key for {}: {}", track_id, e);
|
||||
file.key = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(url) = file.url {
|
||||
return Ok(TrackStream::DirectUrl {
|
||||
url,
|
||||
sampling_rate_hz: file.sampling_rate.map(|v| v.round() as u32),
|
||||
bit_depth: file.bit_depth.map(|v| v as u32),
|
||||
channels: Some(2),
|
||||
});
|
||||
}
|
||||
|
||||
if let (Some(url_template), Some(n_segments)) = (file.url_template, file.n_segments) {
|
||||
return Ok(TrackStream::Segmented {
|
||||
url_template,
|
||||
n_segments,
|
||||
encryption_key_hex: file.key,
|
||||
sampling_rate_hz: file.sampling_rate.map(|v| v.round() as u32),
|
||||
bit_depth: file.bit_depth.map(|v| v as u32),
|
||||
channels: Some(2),
|
||||
});
|
||||
}
|
||||
|
||||
Err(QobuzError::ApiError(
|
||||
"Mobile file/url response did not contain url or url_template".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn get_track_url_legacy(
|
||||
&self,
|
||||
access_token: &str,
|
||||
track_id: &str,
|
||||
@@ -360,6 +561,52 @@ impl QobuzApi {
|
||||
Ok(url_response.url)
|
||||
}
|
||||
|
||||
pub async fn get_track_stream(
|
||||
&self,
|
||||
access_token: &str,
|
||||
track_id: &str,
|
||||
format_id: u32,
|
||||
) -> Result<TrackStream> {
|
||||
match self
|
||||
.get_track_stream_mobile(access_token, track_id, format_id)
|
||||
.await
|
||||
{
|
||||
Ok(stream) => Ok(stream),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Mobile file/url failed for track {} (format {}), falling back to legacy endpoint: {}",
|
||||
track_id, format_id, e
|
||||
);
|
||||
let url = self
|
||||
.get_track_url_legacy(access_token, track_id, format_id)
|
||||
.await?;
|
||||
Ok(TrackStream::DirectUrl {
|
||||
url,
|
||||
sampling_rate_hz: None,
|
||||
bit_depth: None,
|
||||
channels: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_track_url(
|
||||
&self,
|
||||
access_token: &str,
|
||||
track_id: &str,
|
||||
format_id: u32,
|
||||
) -> Result<String> {
|
||||
match self
|
||||
.get_track_stream(access_token, track_id, format_id)
|
||||
.await?
|
||||
{
|
||||
TrackStream::DirectUrl { url, .. } => Ok(url),
|
||||
TrackStream::Segmented { .. } => Err(QobuzError::ApiError(
|
||||
"Track uses segmented stream; use get_track_stream instead".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_track(&self, access_token: &str, track_id: &str) -> Result<Track> {
|
||||
let timestamp = self.get_timestamp();
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use md5::{Digest, Md5};
|
||||
use sha1::Sha1;
|
||||
|
||||
const APP_SECRET: &str = "e79f8b9be485692b0e5f9dd895826368";
|
||||
pub const APP_SECRET: &str = "e79f8b9be485692b0e5f9dd895826368";
|
||||
|
||||
pub fn md5_hash(input: &str) -> String {
|
||||
let mut hasher = Md5::new();
|
||||
|
||||
31
src/main.rs
31
src/main.rs
@@ -5,7 +5,7 @@ use tokio::sync::Mutex;
|
||||
use tracing::{error, info, Level};
|
||||
use tracing_subscriber::FmtSubscriber;
|
||||
|
||||
use qobuzd::api::QobuzApi;
|
||||
use qobuzd::api::{QobuzApi, TrackStream};
|
||||
use qobuzd::auth::QobuzAuth;
|
||||
use qobuzd::config::Config;
|
||||
use qobuzd::qconnect::QConnect;
|
||||
@@ -178,8 +178,33 @@ async fn main() -> Result<()> {
|
||||
let token = guard.get_valid_token().await?;
|
||||
drop(guard);
|
||||
let api = QobuzApi::new(&config);
|
||||
match api.get_track_url(&token, &track_id, format_id).await {
|
||||
Ok(url) => println!("Stream URL: {}", url),
|
||||
match api.get_track_stream(&token, &track_id, format_id).await {
|
||||
Ok(TrackStream::DirectUrl { url, .. }) => println!("Stream URL: {}", url),
|
||||
Ok(TrackStream::Segmented {
|
||||
url_template,
|
||||
n_segments,
|
||||
encryption_key_hex,
|
||||
sampling_rate_hz,
|
||||
bit_depth,
|
||||
..
|
||||
}) => {
|
||||
println!("Segmented stream template: {}", url_template);
|
||||
println!("Segments: {}", n_segments);
|
||||
println!(
|
||||
"Encrypted: {}",
|
||||
if encryption_key_hex.is_some() {
|
||||
"yes"
|
||||
} else {
|
||||
"no"
|
||||
}
|
||||
);
|
||||
if let Some(sr) = sampling_rate_hz {
|
||||
println!("Sampling rate: {} Hz", sr);
|
||||
}
|
||||
if let Some(bits) = bit_depth {
|
||||
println!("Bit depth: {}", bits);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed: {}", e);
|
||||
std::process::exit(1);
|
||||
|
||||
1056
src/player.rs
1056
src/player.rs
File diff suppressed because it is too large
Load Diff
334
src/qconnect.rs
334
src/qconnect.rs
@@ -6,9 +6,9 @@ use tokio::sync::mpsc;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::api::QobuzApi;
|
||||
use crate::api::{QobuzApi, TrackStream};
|
||||
use crate::config::Config;
|
||||
use crate::player::{AudioPlayer, PlayerCommand, PlayerState};
|
||||
use crate::player::{AudioPlayer, PlayerCommand, PlayerState, StreamSource};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Protobuf helpers (hand-rolled, matching the qconnect.proto schema)
|
||||
@@ -385,13 +385,49 @@ fn msg_volume_changed(volume: u64) -> Vec<u8> {
|
||||
}
|
||||
|
||||
/// RNDR_SRVR_MAX_AUDIO_QUALITY_CHANGED (28): renderer confirms quality setting.
|
||||
/// Includes maxAudioQuality (field 1) and networkType (field 2: 1=WIFI, 2=CELLULAR, 3=UNKNOWN).
|
||||
fn msg_max_audio_quality_changed(quality: u64) -> Vec<u8> {
|
||||
/// networkType (field 2) is optional; when absent, quality applies generically.
|
||||
fn msg_max_audio_quality_changed(quality: u64, network_type: Option<u64>) -> Vec<u8> {
|
||||
let mut payload = encode_field_varint(1, quality);
|
||||
payload.extend(encode_field_varint(2, 1)); // networkType = WIFI
|
||||
if let Some(network_type) = network_type {
|
||||
payload.extend(encode_field_varint(2, network_type));
|
||||
}
|
||||
build_qconnect_message(28, &payload)
|
||||
}
|
||||
|
||||
fn msg_file_audio_quality_changed(
|
||||
sampling_rate_hz: u64,
|
||||
bit_depth: u64,
|
||||
channels: u64,
|
||||
audio_quality: u64,
|
||||
) -> Vec<u8> {
|
||||
let mut payload = encode_field_varint(1, sampling_rate_hz);
|
||||
payload.extend(encode_field_varint(2, bit_depth));
|
||||
payload.extend(encode_field_varint(3, channels));
|
||||
payload.extend(encode_field_varint(4, audio_quality));
|
||||
build_qconnect_message(26, &payload)
|
||||
}
|
||||
|
||||
fn msg_device_audio_quality_changed(
|
||||
sampling_rate_hz: u64,
|
||||
bit_depth: u64,
|
||||
channels: u64,
|
||||
) -> Vec<u8> {
|
||||
let mut payload = encode_field_varint(1, sampling_rate_hz);
|
||||
payload.extend(encode_field_varint(2, bit_depth));
|
||||
payload.extend(encode_field_varint(3, channels));
|
||||
build_qconnect_message(27, &payload)
|
||||
}
|
||||
|
||||
fn quality_fallback_audio_params(quality: u32) -> (u32, u32, u32) {
|
||||
match quality {
|
||||
1 => (44100, 16, 2), // MP3
|
||||
2 => (44100, 16, 2), // CD
|
||||
3 => (96000, 24, 2), // Hi-Res up to 96kHz
|
||||
4 | 5 => (192000, 24, 2), // Hi-Res up to 192/384kHz (use 192kHz fallback)
|
||||
_ => (44100, 16, 2),
|
||||
}
|
||||
}
|
||||
|
||||
/// RNDR_SRVR_VOLUME_MUTED (29): renderer confirms mute state.
|
||||
fn msg_volume_muted(muted: bool) -> Vec<u8> {
|
||||
let payload = encode_field_varint(1, if muted { 1 } else { 0 });
|
||||
@@ -441,7 +477,7 @@ fn extract_qconnect_messages(frame_body: &[u8]) -> Vec<(u32, Vec<u8>)> {
|
||||
pub enum QConnectCommand {
|
||||
SetState {
|
||||
playing_state: Option<u32>, // None = not set (keep current), Some(1)=stopped, Some(2)=playing, Some(3)=paused
|
||||
position_ms: u32,
|
||||
position_ms: Option<u32>, // None = field not present
|
||||
current_track: Option<TrackRef>,
|
||||
next_track: Option<TrackRef>,
|
||||
queue_version_major: u32,
|
||||
@@ -496,17 +532,34 @@ fn parse_incoming_commands(data: &[u8]) -> Vec<QConnectCommand> {
|
||||
41 => {
|
||||
let fields = parse_fields(&payload);
|
||||
let playing_state = get_varint_field(&fields, 1).map(|v| v as u32); // None = not present
|
||||
let position_ms = get_varint_field(&fields, 2).unwrap_or(0) as u32;
|
||||
let position_ms = get_varint_field(&fields, 2).map(|v| v as u32);
|
||||
let queue_version_major = get_bytes_field(&fields, 3)
|
||||
.map(|qv| {
|
||||
let qvf = parse_fields(qv);
|
||||
get_varint_field(&qvf, 1).unwrap_or(0) as u32
|
||||
})
|
||||
.unwrap_or(0);
|
||||
let current_track = get_bytes_field(&fields, 4).map(parse_queue_track);
|
||||
let next_track = get_bytes_field(&fields, 5).map(parse_queue_track);
|
||||
let current_track = get_bytes_field(&fields, 4)
|
||||
.map(parse_queue_track)
|
||||
.and_then(|t| {
|
||||
if t.track_id <= 0 || t.queue_item_id < 0 {
|
||||
None
|
||||
} else {
|
||||
Some(t)
|
||||
}
|
||||
});
|
||||
let next_track =
|
||||
get_bytes_field(&fields, 5)
|
||||
.map(parse_queue_track)
|
||||
.and_then(|t| {
|
||||
if t.track_id <= 0 || t.queue_item_id < 0 {
|
||||
None
|
||||
} else {
|
||||
Some(t)
|
||||
}
|
||||
});
|
||||
|
||||
info!("[RECV] SET_STATE: playing_state={:?}, position_ms={}, current_track={:?}, next_track={:?}, queue_ver={}",
|
||||
info!("[RECV] SET_STATE: playing_state={:?}, position_ms={:?}, current_track={:?}, next_track={:?}, queue_ver={}",
|
||||
playing_state, position_ms, current_track, next_track, queue_version_major);
|
||||
|
||||
QConnectCommand::SetState {
|
||||
@@ -851,6 +904,7 @@ async fn run_connection(
|
||||
let mut last_play_command_at: std::time::Instant = std::time::Instant::now();
|
||||
let mut has_seen_position_progress = false; // true once we've seen pos > 0 after a Play
|
||||
let mut track_ended = false; // true when player finishes track naturally
|
||||
let mut ignore_nonzero_seek_until: Option<std::time::Instant> = None;
|
||||
|
||||
// Helper macro: send a state update
|
||||
macro_rules! send_state {
|
||||
@@ -939,20 +993,42 @@ async fn run_connection(
|
||||
} => {
|
||||
info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}",
|
||||
playing_state, current_track.as_ref().map(|t| t.track_id),
|
||||
next_track.as_ref().map(|t| t.track_id), position_ms);
|
||||
next_track.as_ref().map(|t| t.track_id), position_ms.unwrap_or(0));
|
||||
|
||||
let requested_pos = position_ms.map(|p| p as u64);
|
||||
|
||||
let seek_only_state = playing_state.is_none()
|
||||
&& current_track.is_none()
|
||||
&& next_track.is_none()
|
||||
&& *queue_version_major == 0
|
||||
&& *position_ms > 0;
|
||||
&& requested_pos.is_some();
|
||||
|
||||
if seek_only_state {
|
||||
let target_pos = *position_ms as u64;
|
||||
let target_pos = requested_pos.unwrap_or(0);
|
||||
let status = player.status();
|
||||
let current_player_pos = status.position_ms;
|
||||
let should_seek = status.state != PlayerState::Stopped
|
||||
&& target_pos.abs_diff(current_player_pos) > 800;
|
||||
let mut should_seek = target_pos == 0
|
||||
|| target_pos.abs_diff(current_player_pos) > 350;
|
||||
let suppress_nonzero_seek = target_pos > 0
|
||||
&& ignore_nonzero_seek_until
|
||||
.map(|deadline| std::time::Instant::now() < deadline)
|
||||
.unwrap_or(false);
|
||||
if suppress_nonzero_seek {
|
||||
should_seek = false;
|
||||
info!(
|
||||
"[STATE] Ignoring non-zero seek {}ms during settle window",
|
||||
target_pos
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
"[STATE] seek-only command: target={}ms local={}ms state={:?} track={} should_seek={}",
|
||||
target_pos,
|
||||
current_player_pos,
|
||||
status.state,
|
||||
status.track_id,
|
||||
should_seek
|
||||
);
|
||||
|
||||
if should_seek {
|
||||
info!(
|
||||
@@ -961,11 +1037,12 @@ async fn run_connection(
|
||||
);
|
||||
player.send(PlayerCommand::Seek(target_pos));
|
||||
track_ended = false;
|
||||
|
||||
// ACTION_TYPE_SEEK = 8
|
||||
let action_msg = msg_renderer_action(8, Some(*position_ms));
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
if target_pos == 0 {
|
||||
ignore_nonzero_seek_until = Some(
|
||||
std::time::Instant::now()
|
||||
+ std::time::Duration::from_secs(2),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
current_position_ms = target_pos;
|
||||
@@ -987,11 +1064,12 @@ async fn run_connection(
|
||||
current_queue_item_id = track.queue_item_id;
|
||||
current_playing_state = 2;
|
||||
current_buffer_state = 1; // BUFFERING
|
||||
current_position_ms = *position_ms as u64;
|
||||
current_position_ms = requested_pos.unwrap_or(0);
|
||||
current_duration_ms = 0;
|
||||
last_play_command_at = std::time::Instant::now();
|
||||
has_seen_position_progress = false;
|
||||
track_ended = false;
|
||||
ignore_nonzero_seek_until = None;
|
||||
send_state!(ws_tx, msg_id);
|
||||
|
||||
let track_id_str = track.track_id.to_string();
|
||||
@@ -1001,16 +1079,74 @@ async fn run_connection(
|
||||
Err(e) => { warn!("get_track failed: {}", e); 0 }
|
||||
};
|
||||
current_duration_ms = duration_ms;
|
||||
match api.get_track_url(auth_token, &track_id_str, format_id).await {
|
||||
Ok(url) => {
|
||||
info!("[STATE] Got URL, playing (duration={}ms)", duration_ms);
|
||||
player.send(PlayerCommand::Play {
|
||||
match api.get_track_stream(auth_token, &track_id_str, format_id).await {
|
||||
Ok(stream) => {
|
||||
let (player_stream, stream_sr, stream_bits, stream_ch) = match stream {
|
||||
TrackStream::DirectUrl {
|
||||
url,
|
||||
sampling_rate_hz,
|
||||
bit_depth,
|
||||
channels,
|
||||
} => {
|
||||
info!("[STATE] Got direct stream URL (duration={}ms)", duration_ms);
|
||||
(
|
||||
StreamSource::DirectUrl(url),
|
||||
sampling_rate_hz,
|
||||
bit_depth,
|
||||
channels,
|
||||
)
|
||||
}
|
||||
TrackStream::Segmented {
|
||||
url_template,
|
||||
n_segments,
|
||||
encryption_key_hex,
|
||||
sampling_rate_hz,
|
||||
bit_depth,
|
||||
channels,
|
||||
} => {
|
||||
info!(
|
||||
"[STATE] Got segmented stream (segments={}, duration={}ms)",
|
||||
n_segments, duration_ms
|
||||
);
|
||||
(
|
||||
StreamSource::Segmented {
|
||||
url_template,
|
||||
n_segments,
|
||||
encryption_key_hex,
|
||||
},
|
||||
sampling_rate_hz,
|
||||
bit_depth,
|
||||
channels,
|
||||
)
|
||||
}
|
||||
};
|
||||
player.send(PlayerCommand::Play {
|
||||
stream: player_stream,
|
||||
track_id: track.track_id,
|
||||
queue_item_id: track.queue_item_id,
|
||||
duration_ms,
|
||||
start_position_ms: *position_ms as u64,
|
||||
start_position_ms: requested_pos.unwrap_or(0),
|
||||
});
|
||||
let (fallback_sr, fallback_bits, fallback_ch) =
|
||||
quality_fallback_audio_params(max_audio_quality);
|
||||
let sr = stream_sr.unwrap_or(fallback_sr).max(1);
|
||||
let bits = stream_bits.unwrap_or(fallback_bits).max(1);
|
||||
let ch = stream_ch.unwrap_or(fallback_ch).max(1);
|
||||
let file_msg = msg_file_audio_quality_changed(
|
||||
sr as u64,
|
||||
bits as u64,
|
||||
ch as u64,
|
||||
max_audio_quality as u64,
|
||||
);
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &file_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
let dev_msg = msg_device_audio_quality_changed(
|
||||
sr as u64,
|
||||
bits as u64,
|
||||
ch as u64,
|
||||
);
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &dev_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
current_buffer_state = 2; // OK
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -1028,7 +1164,23 @@ async fn run_connection(
|
||||
if !loaded_new_track {
|
||||
match ps {
|
||||
2 => {
|
||||
if current_playing_state == 3 {
|
||||
let status = player.status();
|
||||
let should_restart_same_track = current_track.is_some()
|
||||
&& (track_ended || status.state == PlayerState::Stopped);
|
||||
|
||||
if should_restart_same_track {
|
||||
let restart_pos = requested_pos.unwrap_or(0);
|
||||
info!(
|
||||
"[STATE] Restarting current track from {}ms (ended={} player_state={:?})",
|
||||
restart_pos,
|
||||
track_ended,
|
||||
status.state
|
||||
);
|
||||
player.send(PlayerCommand::Seek(restart_pos));
|
||||
current_playing_state = 2;
|
||||
current_position_ms = restart_pos;
|
||||
track_ended = false;
|
||||
} else if current_playing_state == 3 {
|
||||
info!("[STATE] Resuming playback");
|
||||
player.send(PlayerCommand::Resume);
|
||||
current_playing_state = 2;
|
||||
@@ -1042,8 +1194,8 @@ async fn run_connection(
|
||||
info!("[STATE] Pausing playback");
|
||||
player.send(PlayerCommand::Pause);
|
||||
current_playing_state = 3;
|
||||
if *position_ms > 0 {
|
||||
current_position_ms = *position_ms as u64;
|
||||
if let Some(pos) = requested_pos {
|
||||
current_position_ms = pos;
|
||||
} else {
|
||||
current_position_ms = player.status().position_ms;
|
||||
}
|
||||
@@ -1065,20 +1217,56 @@ async fn run_connection(
|
||||
|
||||
// 4. Apply seek position if provided and not loading new track
|
||||
let is_pause = matches!(playing_state, Some(3));
|
||||
if !loaded_new_track && *position_ms > 0 && !is_pause {
|
||||
let requested = *position_ms as u64;
|
||||
let position_control_state = *queue_version_major == 0
|
||||
&& current_track.is_none()
|
||||
&& next_track.is_none()
|
||||
&& requested_pos.is_some();
|
||||
if !loaded_new_track && !is_pause && position_control_state {
|
||||
let requested = requested_pos.unwrap_or(0);
|
||||
let status = player.status();
|
||||
let local = status.position_ms;
|
||||
if status.state != PlayerState::Stopped && requested.abs_diff(local) > 1500 {
|
||||
info!("[STATE] Position jump detected, seeking to {}ms (local={}ms)", requested, local);
|
||||
let mut should_seek = requested == 0
|
||||
|| requested.abs_diff(local) > 350;
|
||||
let suppress_nonzero_seek = requested > 0
|
||||
&& ignore_nonzero_seek_until
|
||||
.map(|deadline| std::time::Instant::now() < deadline)
|
||||
.unwrap_or(false);
|
||||
if suppress_nonzero_seek {
|
||||
should_seek = false;
|
||||
info!(
|
||||
"[STATE] Ignoring non-zero position control {}ms during settle window",
|
||||
requested
|
||||
);
|
||||
}
|
||||
info!(
|
||||
"[STATE] position-control command: playing_state={:?} target={}ms local={}ms state={:?} track={} should_seek={}",
|
||||
playing_state,
|
||||
requested,
|
||||
local,
|
||||
status.state,
|
||||
status.track_id,
|
||||
should_seek
|
||||
);
|
||||
|
||||
if should_seek {
|
||||
info!(
|
||||
"[STATE] Position jump detected, seeking to {}ms (local={}ms)",
|
||||
requested, local
|
||||
);
|
||||
player.send(PlayerCommand::Seek(requested));
|
||||
track_ended = false;
|
||||
// ACTION_TYPE_SEEK = 8
|
||||
let action_msg = msg_renderer_action(8, Some(*position_ms));
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
if requested == 0 {
|
||||
ignore_nonzero_seek_until = Some(
|
||||
std::time::Instant::now()
|
||||
+ std::time::Duration::from_secs(2),
|
||||
);
|
||||
}
|
||||
}
|
||||
current_position_ms = requested;
|
||||
} else if !loaded_new_track && !is_pause {
|
||||
if let Some(pos) = requested_pos {
|
||||
current_position_ms = pos;
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Always send state update (like reference implementation)
|
||||
@@ -1138,15 +1326,16 @@ async fn run_connection(
|
||||
max_audio_quality = *quality;
|
||||
|
||||
// Confirm quality change to server
|
||||
let resp = msg_max_audio_quality_changed(*quality as u64);
|
||||
let resp = msg_max_audio_quality_changed(*quality as u64, None);
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &resp).into())).await?;
|
||||
msg_id += 1;
|
||||
|
||||
// If currently playing, restart at new quality
|
||||
if current_playing_state == 2 && current_track_id != 0 {
|
||||
let restart_pos = player.status().position_ms;
|
||||
info!("Restarting track {} at new quality format_id={}", current_track_id, format_id);
|
||||
current_buffer_state = 1; // BUFFERING
|
||||
current_position_ms = 0;
|
||||
current_position_ms = restart_pos;
|
||||
send_state!(ws_tx, msg_id);
|
||||
|
||||
let track_id_str = current_track_id.to_string();
|
||||
@@ -1155,15 +1344,74 @@ async fn run_connection(
|
||||
Err(e) => { warn!("get_track failed: {}", e); current_duration_ms }
|
||||
};
|
||||
current_duration_ms = duration_ms;
|
||||
match api.get_track_url(auth_token, &track_id_str, format_id).await {
|
||||
Ok(url) => {
|
||||
player.send(PlayerCommand::Play {
|
||||
match api.get_track_stream(auth_token, &track_id_str, format_id).await {
|
||||
Ok(stream) => {
|
||||
let (player_stream, stream_sr, stream_bits, stream_ch) = match stream {
|
||||
TrackStream::DirectUrl {
|
||||
url,
|
||||
sampling_rate_hz,
|
||||
bit_depth,
|
||||
channels,
|
||||
} => {
|
||||
(
|
||||
StreamSource::DirectUrl(url),
|
||||
sampling_rate_hz,
|
||||
bit_depth,
|
||||
channels,
|
||||
)
|
||||
}
|
||||
TrackStream::Segmented {
|
||||
url_template,
|
||||
n_segments,
|
||||
encryption_key_hex,
|
||||
sampling_rate_hz,
|
||||
bit_depth,
|
||||
channels,
|
||||
} => {
|
||||
(
|
||||
StreamSource::Segmented {
|
||||
url_template,
|
||||
n_segments,
|
||||
encryption_key_hex,
|
||||
},
|
||||
sampling_rate_hz,
|
||||
bit_depth,
|
||||
channels,
|
||||
)
|
||||
}
|
||||
};
|
||||
player.send(PlayerCommand::Play {
|
||||
stream: player_stream,
|
||||
track_id: current_track_id,
|
||||
queue_item_id: current_queue_item_id,
|
||||
duration_ms,
|
||||
start_position_ms: 0,
|
||||
start_position_ms: restart_pos,
|
||||
});
|
||||
let (fallback_sr, fallback_bits, fallback_ch) =
|
||||
quality_fallback_audio_params(*quality);
|
||||
let sr = stream_sr.unwrap_or(fallback_sr).max(1);
|
||||
let bits = stream_bits.unwrap_or(fallback_bits).max(1);
|
||||
let ch = stream_ch.unwrap_or(fallback_ch).max(1);
|
||||
let file_msg = msg_file_audio_quality_changed(
|
||||
sr as u64,
|
||||
bits as u64,
|
||||
ch as u64,
|
||||
*quality as u64,
|
||||
);
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &file_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
let dev_msg = msg_device_audio_quality_changed(
|
||||
sr as u64,
|
||||
bits as u64,
|
||||
ch as u64,
|
||||
);
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &dev_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
// Re-emit quality confirmation after successful restart
|
||||
// so controllers observing the currently active stream update promptly.
|
||||
let confirm = msg_max_audio_quality_changed(*quality as u64, None);
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &confirm).into())).await?;
|
||||
msg_id += 1;
|
||||
current_buffer_state = 2; // OK(2)
|
||||
info!("Restarted at format_id={}", format_id);
|
||||
}
|
||||
|
||||
21
src/types.rs
21
src/types.rs
@@ -251,6 +251,27 @@ pub struct Track {
|
||||
pub rights: Option<Rights>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PlaybackSession {
|
||||
pub session_id: String,
|
||||
pub expires_at: Option<u64>,
|
||||
pub infos: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct FileUrlResponse {
|
||||
pub track_id: Option<i64>,
|
||||
pub duration: Option<f64>,
|
||||
pub url: Option<String>,
|
||||
pub url_template: Option<String>,
|
||||
pub n_segments: Option<u32>,
|
||||
pub format_id: Option<i32>,
|
||||
pub mime_type: Option<String>,
|
||||
pub sampling_rate: Option<f64>,
|
||||
pub bit_depth: Option<i32>,
|
||||
pub key: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Playlist {
|
||||
pub id: u64,
|
||||
|
||||
Reference in New Issue
Block a user