Initial commit: QobuzD - Qobuz Connect renderer for Linux

Rust-based QConnect renderer with:
- QConnect WebSocket protocol (hand-rolled protobuf)
- Audio playback via Symphonia + cpal
- Play, pause, resume, volume, skip support
- Correct BufferState/PlayingState enum values per proto spec
- Server-driven queue management (no local queue)
- Periodic position reporting for track-end detection

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
joren
2026-03-31 20:38:54 +02:00
commit 3a0d6e0240
18 changed files with 7168 additions and 0 deletions

2
.cargo/config.toml Normal file
View File

@@ -0,0 +1,2 @@
[target.x86_64-unknown-linux-musl]
rustflags = ["-C", "target-feature=-crt-static"]

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/target/
*.swp
*.swo

3599
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

44
Cargo.toml Normal file
View File

@@ -0,0 +1,44 @@
[package]
name = "qobuzd"
version = "0.1.0"
edition = "2021"
authors = ["QobuzD Team"]
description = "Qobuz Connect client for Linux"
license = "MIT"
[dependencies]
reqwest = { version = "0.12", features = ["json", "rustls-tls-webpki-roots", "stream", "blocking"], default-features = false }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
md-5 = "0.10"
hmac = "0.12"
sha2 = "0.10"
rand = "0.8"
base64 = "0.22"
uuid = { version = "1.0", features = ["v4"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"
directories = "5.0"
thiserror = "1.0"
chrono = { version = "0.4", features = ["serde"] }
anyhow = "1.0"
clap = { version = "4.0", features = ["derive"] }
keyring = "3"
zeroize = { version = "1.5", features = ["derive"] }
aes-gcm = "0.10"
rand_chacha = "0.3"
sha1 = "0.10"
hex = "0.4"
tokio-tungstenite = { version = "0.24", features = ["native-tls", "native-tls-vendored"] }
futures-util = "0.3"
symphonia = { version = "0.5", features = ["flac", "pcm", "mp3", "aac", "ogg", "wav"] }
cpal = "0.15"
rubato = "0.15"
[profile.release]
strip = true
lto = true
codegen-units = 1
opt-level = "z"

174
PROTOCOL.md Normal file
View File

@@ -0,0 +1,174 @@
# QConnect Protocol Reference
Reverse-engineered from Qobuz Android app (JADX decompilation) and Burp traffic analysis.
## Transport Layer
WebSocket binary frames with custom framing:
- Frame = `type_byte` + `varint(body_length)` + `body`
- Frame type 1 = Auth (field 1=msg_id, field 3=jwt)
- Frame type 2 = Subscribe (field 1=msg_id, field 3=1)
- Frame type 6 = Data payload (field 1=msg_id, field 2=timestamp, field 3=1, field 5=\x02, field 7=qconnect_container)
## QConnect Container (inside frame body field 7)
- Field 3 = serialized QConnect Message
## QConnect Message
```protobuf
message Message {
MessageType message_type = 1; // field number of payload == message_type value
oneof payload {
// Renderer -> Server
Renderer.JoinSessionMessage rndr_srvr_join_session = 21;
Renderer.StateUpdatedMessage rndr_srvr_state_updated = 23;
Renderer.VolumeChangedMessage rndr_srvr_volume_changed = 25;
// Server -> Renderer
Renderer.SetStateMessage srvr_rndr_set_state = 41;
Renderer.SetVolumeMessage srvr_rndr_set_volume = 42;
Renderer.SetActiveMessage srvr_rndr_set_active = 43;
Renderer.SetMaxAudioQualityMsg srvr_rndr_set_max_quality = 44;
Renderer.SetLoopModeMessage srvr_rndr_set_loop_mode = 45;
Renderer.SetShuffleModeMessage srvr_rndr_set_shuffle_mode = 46;
Renderer.MuteVolumeMessage srvr_rndr_mute_volume = 47;
// Controller -> Server
Controller.JoinSessionMessage ctrl_srvr_join_session = 61;
Controller.SetPlayerStateMsg ctrl_srvr_set_player_state = 62;
Controller.QueueLoadTracksMsg ctrl_srvr_queue_load_tracks= 66;
// Server -> Controller
// SessionStateMessage = 81;
}
}
```
## Message Definitions
### Renderer.SetStateMessage (41 - SRVR_RNDR_SET_STATE)
```protobuf
message SetStateMessage {
PlayingState playing_state = 1; // enum: 0=UNKNOWN, 1=STOPPED, 2=PLAYING, 3=PAUSED
int32 current_position = 2; // playback position (seconds)
QueueVersion queue_version = 3; // sub-message (field 1 = major version)
QueueTrackWithContext current_track = 4;
QueueTrackWithContext next_track = 5;
}
```
### Renderer.SetVolumeMessage (42 - SRVR_RNDR_SET_VOLUME)
```protobuf
message SetVolumeMessage {
int32 volume = 1; // absolute volume (0-100)
int32 volume_delta = 2; // relative change
}
```
### Renderer.SetActiveMessage (43 - SRVR_RNDR_SET_ACTIVE)
```protobuf
message SetActiveMessage {
bool active = 1;
}
```
### Common.QueueTrackWithContext
```protobuf
message QueueTrackWithContext {
int32 queue_item_id = 1;
int32 track_id = 2;
bytes context_uuid = 3; // optional
}
```
### Common.PlaybackPosition
```protobuf
message PlaybackPosition {
fixed64 timestamp = 1; // epoch millis
int32 value = 2; // position in seconds
}
```
### RendererState (sent by renderer to report its state)
```protobuf
message RendererState {
PlayingState playing_state = 1;
BufferState buffer_state = 2; // enum: 0=UNKNOWN, 1=BUFFERING, 2=OK, 3=ERROR, 4=UNDERRUN
PlaybackPosition current_position = 3;
int32 duration = 4; // seconds
QueueVersion queue_version = 5; // sub-message (field 1 = major)
int32 current_queue_item_id = 6;
int32 next_queue_item_id = 7;
}
```
### Renderer.JoinSessionMessage (21 - RNDR_SRVR_JOIN_SESSION)
```protobuf
message JoinSessionMessage {
bytes session_uuid = 1;
DeviceInfo device_info = 2;
RendererState initial_state = 4;
bool is_active = 5;
}
```
### Renderer.StateUpdatedMessage (23 - RNDR_SRVR_STATE_UPDATED)
```protobuf
message StateUpdatedMessage {
RendererState state = 1;
}
```
### Renderer.VolumeChangedMessage (25 - RNDR_SRVR_VOLUME_CHANGED)
```protobuf
message VolumeChangedMessage {
int32 volume = 1;
}
```
### Controller.JoinSessionMessage (61 - CTRL_SRVR_JOIN_SESSION)
```protobuf
message JoinSessionMessage {
DeviceInfo device_info = 2;
}
```
### Common.DeviceInfo
```protobuf
message DeviceInfo {
bytes device_uuid = 1;
string friendly_name = 2;
string brand = 3;
string model = 4;
string serial_number = 5;
DeviceType type = 6; // enum: 5=COMPUTER
Capabilities capabilities = 7;
string software_version = 8;
}
```
### Common.Capabilities
```protobuf
message Capabilities {
int32 audio_quality = 1;
int32 field_2 = 2; // observed value: 4
int32 field_3 = 3; // observed value: 2
}
```
## PlayingState Enum
- 0 = UNKNOWN (idle sync, ignore)
- 1 = STOPPED
- 2 = PLAYING
- 3 = PAUSED
## Connection Flow
1. Get JWT + WS endpoint via `GET /api.json/0.2/qws/getToken`
2. Open ctrl WebSocket, send Auth (frame type 1) + Subscribe (frame type 2) + CTRL_SRVR_JOIN_SESSION (61)
3. Receive SRVR_CTRL_SESSION_STATE (81) containing session_uuid
4. Open renderer WebSocket, send Auth + Subscribe + RNDR_SRVR_JOIN_SESSION (21) with session_uuid
5. Receive SET_ACTIVE (43) confirming join
6. Enter main loop: handle SET_STATE (41), SET_VOLUME (42), etc.
7. Report state changes via STATE_UPDATED (23) and VOLUME_CHANGED (25)
8. Send heartbeat state updates every 5 seconds

172
decode_burp.py Normal file
View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3
"""Decode QConnect WebSocket messages from Burp capture."""
import json, sys
def decode_varint(data, pos):
val = 0; shift = 0
while pos < len(data):
b = data[pos]; pos += 1
val |= (b & 0x7F) << shift
if not (b & 0x80): return val, pos
shift += 7
return val, pos
def parse_fields(data):
fields = []; pos = 0
while pos < len(data):
if pos >= len(data): break
tag, pos = decode_varint(data, pos)
fnum = tag >> 3; wt = tag & 7
if wt == 0:
val, pos = decode_varint(data, pos)
fields.append((fnum, wt, val.to_bytes(8, 'little')))
elif wt == 1:
fields.append((fnum, wt, data[pos:pos+8])); pos += 8
elif wt == 2:
ln, pos = decode_varint(data, pos)
fields.append((fnum, wt, data[pos:pos+ln])); pos += ln
elif wt == 5:
fields.append((fnum, wt, data[pos:pos+4])); pos += 4
else: break
return fields
def get_varint(fields, num):
for f, w, d in fields:
if f == num and w == 0:
return int.from_bytes(d[:8], 'little')
return None
def get_bytes(fields, num):
for f, w, d in fields:
if f == num and w == 2: return d
return None
def get_fixed32(fields, num):
for f, w, d in fields:
if f == num and w == 5: return int.from_bytes(d[:4], 'little')
return None
def get_fixed64(fields, num):
for f, w, d in fields:
if f == num and w == 1: return int.from_bytes(d[:8], 'little')
return None
MSG_NAMES = {
1: "ERROR", 21: "RNDR_JOIN", 23: "STATE_UPDATED", 25: "VOLUME_CHANGED",
28: "QUALITY_CHANGED", 29: "VOLUME_MUTED", 41: "SET_STATE", 42: "SET_VOLUME",
43: "SET_ACTIVE", 44: "SET_QUALITY", 45: "SET_LOOP", 46: "SET_SHUFFLE",
47: "MUTE_VOLUME", 61: "CTRL_JOIN", 66: "QUEUE_REQ", 67: "QUEUE_REQ2",
75: "QUEUE_UPDATE", 76: "SESSION_SETUP", 77: "ACK_JOIN", 79: "TRACK_LIST",
82: "SRV_STATE_ECHO", 83: "DEVICE_LIST", 86: "SRV_VOL_ECHO",
87: "SRV_VOL_ECHO2", 90: "SRV_QUEUE_INFO", 91: "SRV_QUEUE_RESP",
97: "SRV_ACK", 98: "SRV_ACK2", 99: "SRV_QUALITY_ECHO",
100: "SRV_QUALITY_INFO", 103: "SRV_TRACKLIST_RESP"
}
def decode_renderer_state(data):
"""Decode RendererState proto"""
f = parse_fields(data)
ps = get_varint(f, 1)
bs = get_varint(f, 2)
pos_data = get_bytes(f, 3)
dur = get_varint(f, 4)
qi = get_varint(f, 6)
nqi = get_varint(f, 7)
pos_ms = None; ts = None
if pos_data:
pf = parse_fields(pos_data)
ts = get_fixed64(pf, 1)
pos_ms = get_varint(pf, 2)
return f"playing={ps} buffer={bs} pos={pos_ms}ms dur={dur}ms qi={qi} nqi={nqi}"
def decode_set_state(data):
"""Decode SET_STATE (type 41) payload"""
f = parse_fields(data)
ps = get_varint(f, 1)
pos = get_varint(f, 2)
qv = get_bytes(f, 3)
ct = get_bytes(f, 4)
nt = get_bytes(f, 5)
qv_str = ""
if qv:
qvf = parse_fields(qv)
qv_str = f" qver={get_varint(qvf, 1)}.{get_varint(qvf, 2)}"
ct_str = ""
if ct:
ctf = parse_fields(ct)
ct_str = f" cur_track(qi={get_varint(ctf, 1)} tid={get_fixed32(ctf, 2)})"
nt_str = ""
if nt:
ntf = parse_fields(nt)
nt_str = f" next_track(qi={get_varint(ntf, 1)} tid={get_fixed32(ntf, 2)})"
return f"playing_state={'None' if ps is None else ps} pos={pos}{qv_str}{ct_str}{nt_str}"
def process_message(payload_str, direction):
data = payload_str.encode('latin-1')
if len(data) < 2: return
# Decode frame layer
pos = 0
while pos < len(data):
if pos >= len(data): break
ft = data[pos]; pos += 1
flen, pos = decode_varint(data, pos)
if pos + flen > len(data): break
fbody = data[pos:pos+flen]; pos += flen
if ft != 6: continue # only data frames
# Parse frame body fields
ff = parse_fields(fbody)
f7 = get_bytes(ff, 7)
if not f7: continue
# Inside f7, field 3 = QConnect message
cf = parse_fields(f7)
for cfnum, cwt, cdata in cf:
if cfnum != 3 or cwt != 2: continue
mf = parse_fields(cdata)
mt = get_varint(mf, 1)
if mt is None: continue
name = MSG_NAMES.get(mt, f"TYPE_{mt}")
payload = get_bytes(mf, mt)
extra = ""
if mt == 23 and payload: # STATE_UPDATED
# Unwrap field 1 (RendererState wrapper)
sf = parse_fields(payload)
state_data = get_bytes(sf, 1)
if state_data:
extra = " " + decode_renderer_state(state_data)
else:
extra = " " + decode_renderer_state(payload)
elif mt == 41 and payload: # SET_STATE
extra = " " + decode_set_state(payload)
elif mt == 82 and payload: # SRV_STATE_ECHO
sf = parse_fields(payload)
state_data = get_bytes(sf, 1)
if state_data:
extra = " " + decode_renderer_state(state_data)
arrow = ">>>" if "CLIENT" in direction else "<<<"
print(f" {arrow} {name}({mt}){extra}")
# Read from burp JSON
import subprocess
# Just process messages passed on stdin
for line in sys.stdin:
line = line.strip()
if not line: continue
try:
msg = json.loads(line)
d = msg.get('direction', '')
p = msg.get('payload', '')
process_message(p, d)
except: pass

445
src/api.rs Normal file
View File

@@ -0,0 +1,445 @@
use crate::config::Config;
use crate::crypto;
use crate::error::{QobuzError, Result};
use crate::types::*;
use reqwest::Client;
use std::time::Duration;
#[derive(Clone)]
pub struct QobuzApi {
client: Client,
base_url: String,
app_id: String,
device_id: String,
device_name: String,
session_id: String,
}
impl QobuzApi {
pub fn new(config: &Config) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(30))
.user_agent("Qobuzd/0.1.0")
.build()
.expect("Failed to create HTTP client");
Self {
client,
base_url: "https://www.qobuz.com".to_string(),
app_id: config.app_id.clone(),
device_id: config.device_id.clone(),
device_name: config.device_name.clone(),
session_id: config.session_id.clone(),
}
}
pub fn get_device_id(&self) -> &str {
&self.device_id
}
pub fn get_device_name(&self) -> &str {
&self.device_name
}
pub fn get_session_id(&self) -> &str {
&self.session_id
}
fn get_timestamp(&self) -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64
}
fn build_auth_headers(&self, access_token: Option<&str>) -> reqwest::header::HeaderMap {
use reqwest::header::*;
let mut headers = HeaderMap::new();
headers.insert("X-Device-Platform", "linux".parse().unwrap());
headers.insert("X-Device-Model", self.device_name.parse().unwrap());
headers.insert("X-Device-Manufacturer-Id", self.device_id.parse().unwrap());
if let Some(token) = access_token {
let auth_value = format!("Bearer {}", token);
headers.insert(AUTHORIZATION, auth_value.parse().unwrap());
}
headers
}
pub async fn login(&self, email: &str, password: &str) -> Result<LoginResponse> {
let timestamp = self.get_timestamp();
let signature = crypto::generate_login_signature(email, password, &self.app_id, timestamp);
let url = format!(
"{}/api.json/0.2/oauth2/login?username={}&password={}&app_id={}&request_ts={}&request_sig={}",
self.base_url,
urlencoding::encode(email),
urlencoding::encode(password),
self.app_id,
timestamp,
signature
);
let response = self.client
.get(&url)
.header("User-Agent", "Dalvik/2.1.0 (Linux; U; Android 9; Nexus 6P Build/PQ3A.190801.002) QobuzMobileAndroid/9.7.0.3-b26022717")
.header("X-App-Id", &self.app_id)
.header("X-App-Version", "9.7.0.3")
.header("X-Device-Platform", "android")
.header("X-Device-Model", "Nexus 6P")
.header("X-Device-Os-Version", "9")
.send()
.await?;
let status = response.status();
if !status.is_success() {
let error: ErrorResponse = response.json().await.unwrap_or_else(|_| ErrorResponse {
message: Some("Login failed".to_string()),
code: Some(status.as_u16() as u32),
status: Some("error".to_string()),
errors: None,
});
return Err(QobuzError::AuthError(
error.message.unwrap_or_else(|| "Unknown error".to_string()),
));
}
let login_response: LoginResponse = response.json().await?;
Ok(login_response)
}
pub async fn refresh_token(&self, refresh_token: &str) -> Result<OAuthTokens> {
let timestamp = self.get_timestamp();
let signature = crypto::generate_request_signature(
"oauth2/token",
&[
("refresh_token", refresh_token),
("grant_type", "refresh_token"),
],
timestamp,
);
let url = format!(
"{}/api.json/0.2/oauth2/token?refresh_token={}&grant_type=refresh_token&app_id={}&request_ts={}&request_sig={}",
self.base_url,
urlencoding::encode(refresh_token),
self.app_id,
timestamp,
signature
);
let response = self.client
.get(&url)
.header("User-Agent", "Dalvik/2.1.0 (Linux; U; Android 9; Nexus 6P Build/PQ3A.190801.002) QobuzMobileAndroid/9.7.0.3-b26022717")
.header("X-App-Id", &self.app_id)
.header("X-App-Version", "9.7.0.3")
.header("X-Device-Platform", "android")
.header("X-Device-Model", "Nexus 6P")
.header("X-Device-Os-Version", "9")
.send()
.await?;
if !response.status().is_success() {
return Err(QobuzError::AuthError("Token refresh failed".to_string()));
}
let tokens: OAuthTokens = response.json().await?;
Ok(tokens)
}
pub async fn get_user(&self, access_token: &str) -> Result<User> {
let url = format!(
"{}/api.json/0.2/user/get?app_id={}",
self.base_url, self.app_id
);
let response = self
.client
.get(&url)
.headers(self.build_auth_headers(Some(access_token)))
.send()
.await?;
if !response.status().is_success() {
return Err(QobuzError::ApiError("Failed to get user".to_string()));
}
let user: User = response.json().await?;
Ok(user)
}
pub async fn get_link_token(
&self,
access_token: &str,
action: &str,
) -> Result<LinkTokenResponse> {
let timestamp = self.get_timestamp();
let signature = crypto::generate_request_signature(
"link/token",
&[
("link_action", action),
("external_device_id", &self.device_id),
],
timestamp,
);
let url = format!(
"{}/api.json/0.2/link/token?app_id={}&request_ts={}&request_sig={}",
self.base_url, self.app_id, timestamp, signature
);
let body = LinkTokenRequest {
link_action: action.to_string(),
external_device_id: self.device_id.clone(),
};
let response = self
.client
.post(&url)
.headers(self.build_auth_headers(Some(access_token)))
.json(&body)
.send()
.await?;
if !response.status().is_success() {
return Err(QobuzError::LinkError(
"Failed to get link token".to_string(),
));
}
let link_response: LinkTokenResponse = response.json().await?;
Ok(link_response)
}
pub async fn get_device_token(
&self,
access_token: &str,
link_token: &str,
link_device_id: &str,
) -> Result<DeviceTokenResponse> {
let timestamp = self.get_timestamp();
let signature = crypto::generate_request_signature(
"link/device/token",
&[
("link_token", link_token),
("link_device_id", link_device_id),
("external_device_id", &self.device_id),
],
timestamp,
);
let url = format!(
"{}/api.json/0.2/link/device/token?app_id={}&request_ts={}&request_sig={}",
self.base_url, self.app_id, timestamp, signature
);
let body = DeviceTokenRequest {
link_token: link_token.to_string(),
link_device_id: link_device_id.to_string(),
external_device_id: self.device_id.clone(),
};
let response = self
.client
.post(&url)
.headers(self.build_auth_headers(Some(access_token)))
.json(&body)
.send()
.await?;
if !response.status().is_success() {
return Err(QobuzError::LinkError(
"Failed to get device token".to_string(),
));
}
let device_response: DeviceTokenResponse = response.json().await?;
Ok(device_response)
}
pub async fn get_qws_token(&self, access_token: &str) -> Result<QwsTokenResponse> {
let timestamp = self.get_timestamp();
let signature = crypto::generate_request_signature("qws/createToken", &[], timestamp);
let url = format!(
"{}/api.json/0.2/qws/createToken?app_id={}&request_ts={}&request_sig={}",
self.base_url, self.app_id, timestamp, signature
);
let response = self
.client
.post(&url)
.headers(self.build_auth_headers(Some(access_token)))
.form(&[("jwt", "jwt_qws")])
.send()
.await?;
if !response.status().is_success() {
return Err(QobuzError::ApiError("Failed to get QWS token".to_string()));
}
let qws_response: QwsTokenResponse = response.json().await?;
Ok(qws_response)
}
pub async fn get_album(&self, access_token: &str, album_id: &str) -> Result<Album> {
let url = format!(
"{}/api.json/0.2/album/get?app_id={}&album_id={}",
self.base_url, self.app_id, album_id
);
let response = self
.client
.get(&url)
.headers(self.build_auth_headers(Some(access_token)))
.send()
.await?;
if !response.status().is_success() {
return Err(QobuzError::ApiError("Failed to get album".to_string()));
}
let album: Album = response.json().await?;
Ok(album)
}
pub async fn get_track_url(
&self,
access_token: &str,
track_id: &str,
format_id: u32,
) -> Result<String> {
let timestamp = self.get_timestamp();
let format_id_str = format_id.to_string();
let signature = crypto::generate_request_signature(
"track/getFileUrl",
&[
("format_id", &format_id_str),
("intent", "stream"),
("track_id", track_id),
],
timestamp,
);
let url = format!(
"{}/api.json/0.2/track/getFileUrl?app_id={}&track_id={}&format_id={}&intent=stream&request_ts={}&request_sig={}",
self.base_url, self.app_id, track_id, format_id, timestamp, signature
);
let response = self
.client
.get(&url)
.headers(self.build_auth_headers(Some(access_token)))
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(QobuzError::ApiError(format!(
"Failed to get track URL: {} - {}",
status, body
)));
}
#[derive(serde::Deserialize)]
struct TrackUrlResponse {
url: String,
}
let url_response: TrackUrlResponse = response.json().await?;
Ok(url_response.url)
}
pub async fn get_track(&self, access_token: &str, track_id: &str) -> Result<Track> {
let timestamp = self.get_timestamp();
let signature =
crypto::generate_request_signature("track/get", &[("track_id", track_id)], timestamp);
let url = format!(
"{}/api.json/0.2/track/get?app_id={}&track_id={}&request_ts={}&request_sig={}",
self.base_url, self.app_id, track_id, timestamp, signature
);
let response = self
.client
.get(&url)
.headers(self.build_auth_headers(Some(access_token)))
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(QobuzError::ApiError(format!(
"Failed to get track: {} - {}",
status, body
)));
}
let track: Track = response.json().await?;
Ok(track)
}
pub async fn search(
&self,
access_token: &str,
query: &str,
search_type: &str,
limit: u32,
offset: u32,
) -> Result<serde_json::Value> {
let url = format!(
"{}/api.json/0.2/search?app_id={}&query={}&type={}&limit={}&offset={}",
self.base_url,
self.app_id,
urlencoding::encode(query),
search_type,
limit,
offset
);
let response = self
.client
.get(&url)
.headers(self.build_auth_headers(Some(access_token)))
.send()
.await?;
if !response.status().is_success() {
return Err(QobuzError::ApiError("Search failed".to_string()));
}
let results: serde_json::Value = response.json().await?;
Ok(results)
}
}
mod urlencoding {
pub fn encode(s: &str) -> String {
let mut result = String::new();
for c in s.chars() {
match c {
'A'..='Z' | 'a'..='z' | '0'..='9' | '-' | '_' | '.' | '~' => {
result.push(c);
}
_ => {
for b in c.to_string().as_bytes() {
result.push_str(&format!("%{:02X}", b));
}
}
}
}
result
}
}

144
src/auth.rs Normal file
View File

@@ -0,0 +1,144 @@
use crate::api::QobuzApi;
use crate::config::{Config, DeviceLinkCredentials};
use crate::error::{QobuzError, Result};
use crate::token::TokenManager;
use crate::types::{DeviceTokenResponse, LinkTokenResponse, User};
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct QobuzAuth {
api: QobuzApi,
token_manager: TokenManager,
access_token: Arc<Mutex<Option<String>>>,
}
impl QobuzAuth {
pub fn new(config: Config) -> Self {
let api = QobuzApi::new(&config);
let token_manager = TokenManager::new(config.clone());
Self {
api,
token_manager,
access_token: Arc::new(Mutex::new(None)),
}
}
pub async fn login_with_credentials(&self, email: &str, password: &str) -> Result<User> {
let response = self.api.login(email, password).await?;
let tokens = response.oauth;
self.token_manager.store_tokens(&tokens)?;
{
let mut token = self.access_token.lock().await;
*token = Some(tokens.access_token.clone());
}
Ok(response.user)
}
pub async fn refresh_access_token(&self) -> Result<()> {
let config =
crate::config::Config::load().map_err(|e| QobuzError::ConfigError(e.to_string()))?;
if let Some(creds) = config.credentials {
let new_tokens = self.api.refresh_token(&creds.refresh_token).await?;
self.token_manager.store_tokens(&new_tokens)?;
let mut token = self.access_token.lock().await;
*token = Some(new_tokens.access_token);
} else {
return Err(QobuzError::TokenError(
"No refresh token available".to_string(),
));
}
Ok(())
}
pub async fn get_valid_token(&self) -> Result<String> {
if let Some(token) = self.access_token.lock().await.clone() {
return Ok(token);
}
if let Some(tokens) = self.token_manager.load_tokens()? {
let mut token = self.access_token.lock().await;
*token = Some(tokens.access_token.clone());
return Ok(tokens.access_token);
}
Err(QobuzError::TokenError(
"No valid token available".to_string(),
))
}
pub async fn logout(&self) -> Result<()> {
let mut token = self.access_token.lock().await;
*token = None;
let mut config =
crate::config::Config::load().map_err(|e| QobuzError::ConfigError(e.to_string()))?;
config.clear_credentials()?;
Ok(())
}
pub async fn start_device_linking(&self) -> Result<LinkTokenResponse> {
let token = self.get_valid_token().await?;
let response = self.api.get_link_token(&token, "signIn").await?;
Ok(response)
}
pub async fn get_device_token(
&self,
link_token: &str,
link_device_id: &str,
) -> Result<DeviceTokenResponse> {
let token = self.get_valid_token().await?;
let response = self
.api
.get_device_token(&token, link_token, link_device_id)
.await?;
if let Some(oauth) = &response.oauth {
let creds = DeviceLinkCredentials {
device_access_token: oauth.access_token.clone(),
device_refresh_token: oauth.refresh_token.clone(),
device_id: self.api.get_device_id().to_string(),
link_device_id: link_device_id.to_string(),
expires_at: None,
};
self.token_manager.store_device_link_credentials(creds)?;
}
Ok(response)
}
pub async fn check_device_link_status(
&self,
link_token: &str,
link_device_id: &str,
) -> Result<DeviceTokenResponse> {
self.get_device_token(link_token, link_device_id).await
}
pub fn get_device_id(&self) -> &str {
self.api.get_device_id()
}
pub fn get_device_name(&self) -> &str {
self.api.get_device_name()
}
pub async fn is_linked(&self) -> bool {
self.token_manager
.load_device_link_credentials()
.map(|c| c.is_some())
.unwrap_or(false)
}
pub async fn unlink_device(&self) -> Result<()> {
self.token_manager.clear_device_link_credentials()
}
}

102
src/config.rs Normal file
View File

@@ -0,0 +1,102 @@
use crate::crypto;
use crate::error::{QobuzError, Result};
use directories::ProjectDirs;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub app_id: String,
pub device_id: String,
pub session_id: String,
pub device_name: String,
pub cache_dir: PathBuf,
pub config_dir: PathBuf,
pub credentials: Option<StoredCredentials>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredCredentials {
pub access_token: String,
pub refresh_token: String,
pub user_id: Option<u64>,
pub expires_at: Option<i64>,
pub email: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceLinkCredentials {
pub device_access_token: String,
pub device_refresh_token: String,
pub device_id: String,
pub link_device_id: String,
pub expires_at: Option<i64>,
}
impl Config {
pub fn new(device_name: String) -> Result<Self> {
let proj_dirs = ProjectDirs::from("com", "qobuz", "qobuzd").ok_or_else(|| {
QobuzError::ConfigError("Could not determine config directory".into())
})?;
let config_dir = proj_dirs.config_dir().to_path_buf();
let cache_dir = proj_dirs.cache_dir().to_path_buf();
std::fs::create_dir_all(&config_dir)?;
std::fs::create_dir_all(&cache_dir)?;
let device_id = crypto::generate_device_id();
let session_id = crypto::generate_session_id();
Ok(Self {
app_id: "312369995".to_string(),
device_id,
session_id,
device_name,
cache_dir,
config_dir,
credentials: None,
})
}
pub fn load() -> Result<Self> {
let proj_dirs = ProjectDirs::from("com", "qobuz", "qobuzd").ok_or_else(|| {
QobuzError::ConfigError("Could not determine config directory".into())
})?;
let config_path = proj_dirs.config_dir().join("config.json");
if config_path.exists() {
let content = std::fs::read_to_string(&config_path)?;
let config: Config = serde_json::from_str(&content)?;
Ok(config)
} else {
Self::new("qobuzd".to_string())
}
}
pub fn save(&self) -> Result<()> {
let config_path = self.config_dir.join("config.json");
let content = serde_json::to_string_pretty(self)?;
std::fs::write(config_path, content)?;
Ok(())
}
pub fn store_credentials(&mut self, creds: StoredCredentials) -> Result<()> {
self.credentials = Some(creds);
self.save()
}
pub fn clear_credentials(&mut self) -> Result<()> {
self.credentials = None;
self.save()
}
pub fn credentials_path(&self) -> PathBuf {
self.config_dir.join("credentials.enc")
}
pub fn device_link_credentials_path(&self) -> PathBuf {
self.config_dir.join("device_link.json")
}
}

145
src/connect.proto Normal file
View File

@@ -0,0 +1,145 @@
syntax = "proto3";
package qobuz.connect;
message DeviceInfo {
string device_id = 1;
string device_name = 2;
string device_type = 3;
string firmware_version = 4;
string ip_address = 5;
int32 port = 6;
Capabilities capabilities = 7;
}
message Capabilities {
bool supports_video = 1;
bool supports_audio = 2;
bool supports_image = 3;
repeated string supported_formats = 4;
}
message ControlMessage {
string message_id = 1;
MessageType type = 2;
oneof payload {
PlayRequest play = 10;
PauseRequest pause = 11;
StopRequest stop = 12;
SeekRequest seek = 13;
VolumeRequest volume = 14;
GetStatusRequest get_status = 15;
LoadRequest load = 16;
}
}
enum MessageType {
UNKNOWN = 0;
PLAY = 1;
PAUSE = 2;
STOP = 3;
SEEK = 4;
VOLUME = 5;
GET_STATUS = 6;
LOAD = 7;
STATUS = 100;
ERROR = 101;
CONNECTED = 102;
DISCONNECTED = 103;
}
message PlayRequest {
string track_url = 1;
int64 position_ms = 2;
}
message PauseRequest {}
message StopRequest {}
message SeekRequest {
int64 position_ms = 1;
}
message VolumeRequest {
int32 volume = 1; // 0-100
bool mute = 2;
}
message GetStatusRequest {}
message LoadRequest {
string track_id = 1;
string album_id = 2;
int32 format_id = 3;
int64 position_ms = 4;
}
message ControlResponse {
string message_id = 1;
MessageType type = 2;
bool success = 3;
string error_message = 4;
oneof payload {
StatusResponse status = 10;
}
}
message StatusResponse {
PlaybackState state = 1;
string track_id = 2;
string album_id = 3;
string track_url = 4;
int64 position_ms = 5;
int64 duration_ms = 6;
int32 volume = 7;
bool muted = 8;
TrackInfo track_info = 9;
}
enum PlaybackState {
IDLE = 0;
LOADING = 1;
PLAYING = 2;
PAUSED = 3;
BUFFERING = 4;
ERROR = 5;
}
message TrackInfo {
string title = 1;
string artist = 2;
string album = 3;
string album_artist = 4;
int32 track_number = 5;
int32 disc_number = 6;
int64 duration_ms = 7;
string artwork_url = 8;
string format = 9;
int32 bit_depth = 10;
int32 sample_rate = 11;
}
message LinkRequest {
string device_id = 1;
string device_name = 2;
string device_type = 3;
}
message LinkResponse {
bool success = 1;
string link_token = 2;
string error_message = 3;
}
message StreamRequest {
string track_id = 1;
int32 format_id = 2;
int64 position_ms = 3;
}
message StreamResponse {
bool success = 1;
string stream_url = 2;
string error_message = 3;
}

87
src/crypto.rs Normal file
View File

@@ -0,0 +1,87 @@
use md5::{Digest, Md5};
use sha1::Sha1;
const APP_SECRET: &str = "e79f8b9be485692b0e5f9dd895826368";
pub fn md5_hash(input: &str) -> String {
let mut hasher = Md5::new();
hasher.update(input.as_bytes());
format!("{:x}", hasher.finalize())
}
pub fn sha1_hash(input: &str) -> String {
let mut hasher = Sha1::new();
hasher.update(input.as_bytes());
hex::encode(hasher.finalize())
}
pub fn generate_request_signature(
endpoint: &str,
params: &[(&str, &str)],
timestamp: i64,
) -> String {
let endpoint_clean = endpoint.replace("/", "");
let mut param_str = params
.iter()
.map(|(k, v)| format!("{}{}", k, v))
.collect::<Vec<_>>();
param_str.sort();
let data = format!(
"{}{}{}{}",
endpoint_clean,
param_str.join(""),
timestamp,
APP_SECRET
);
md5_hash(&data)
}
pub fn generate_login_signature(
username: &str,
password: &str,
_app_id: &str,
timestamp: i64,
) -> String {
generate_request_signature(
"oauth2/login",
&[("username", username), ("password", password)],
timestamp,
)
}
pub fn generate_device_id() -> String {
use rand::Rng;
let mut rng = rand::thread_rng();
let bytes: [u8; 16] = rng.gen();
bytes
.iter()
.map(|b| format!("{:02x}", b))
.collect::<String>()
}
pub fn generate_session_id() -> String {
uuid::Uuid::new_v4().to_string()
}
pub fn generate_client_id() -> String {
uuid::Uuid::new_v4().to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_md5_hash() {
let result = md5_hash("test");
assert_eq!(result, "098f6bcd4621d373cade4e832627b4f6");
}
#[test]
fn test_request_signature() {
let sig = generate_request_signature("test", &[("a", "1"), ("b", "2")], 1234567890);
assert_eq!(sig.len(), 32);
}
}

33
src/error.rs Normal file
View File

@@ -0,0 +1,33 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum QobuzError {
#[error("Authentication failed: {0}")]
AuthError(String),
#[error("API request failed: {0}")]
ApiError(String),
#[error("Network error: {0}")]
NetworkError(#[from] reqwest::Error),
#[error("JSON parse error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Token error: {0}")]
TokenError(String),
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Device linking error: {0}")]
LinkError(String),
#[error("Crypto error: {0}")]
CryptoError(String),
}
pub type Result<T> = std::result::Result<T, QobuzError>;

9
src/lib.rs Normal file
View File

@@ -0,0 +1,9 @@
pub mod api;
pub mod auth;
pub mod config;
pub mod crypto;
pub mod error;
pub mod player;
pub mod qconnect;
pub mod token;
pub mod types;

228
src/main.rs Normal file
View File

@@ -0,0 +1,228 @@
use anyhow::Result;
use clap::{Parser, Subcommand};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error, info, Level};
use tracing_subscriber::FmtSubscriber;
use qobuzd::api::QobuzApi;
use qobuzd::auth::QobuzAuth;
use qobuzd::config::Config;
use qobuzd::qconnect::QConnect;
#[derive(Parser)]
#[command(name = "qobuzd")]
#[command(about = "Qobuz Connect client for Linux")]
struct Cli {
#[arg(short, long, default_value = "qobuzd")]
name: String,
#[command(subcommand)]
command: Commands,
#[arg(short, long, default_value = "info")]
log_level: String,
}
#[derive(Subcommand)]
enum Commands {
Login {
#[arg(short, long)]
email: String,
#[arg(short, long)]
password: String,
},
Logout,
Status,
User,
Search {
#[arg(short, long)]
query: String,
#[arg(short, long, default_value = "albums")]
search_type: String,
},
Album {
#[arg(short, long)]
album_id: String,
},
Stream {
#[arg(short, long)]
track_id: String,
#[arg(short, long, default_value = "5")]
format_id: u32,
},
Serve,
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let level = match cli.log_level.as_str() {
"debug" | "trace" => Level::DEBUG,
"warn" => Level::WARN,
"error" => Level::ERROR,
_ => Level::INFO,
};
tracing::subscriber::set_global_default(
FmtSubscriber::builder()
.with_max_level(level)
.with_target(false)
.with_thread_ids(false)
.with_file(true)
.with_line_number(true)
.finish(),
)
.expect("failed to set subscriber");
let config = Config::new(cli.name.clone())?;
let auth = Arc::new(Mutex::new(QobuzAuth::new(config.clone())));
match cli.command {
Commands::Login { email, password } => {
println!("Logging in as {}...", email);
let auth_guard = auth.lock().await;
match auth_guard.login_with_credentials(&email, &password).await {
Ok(user) => {
println!(
"Logged in as: {} (id: {})",
user.display_name.unwrap_or_default(),
user.id
);
}
Err(e) => {
error!("Login failed: {}", e);
std::process::exit(1);
}
}
}
Commands::Logout => {
let guard = auth.lock().await;
guard.logout().await?;
println!("Logged out");
}
Commands::Status => {
let guard = auth.lock().await;
if guard.is_linked().await {
println!("Device is linked");
} else {
println!("Device is not linked");
}
}
Commands::User => {
let guard = auth.lock().await;
let token = guard.get_valid_token().await?;
drop(guard);
let api = QobuzApi::new(&config);
match api.get_user(&token).await {
Ok(user) => {
println!("User: {}", user.display_name.unwrap_or_default());
println!("Email: {}", user.email);
if let Some(sub) = &user.subscription {
println!("Subscription: {}", sub.offer);
}
}
Err(e) => {
error!("Failed: {}", e);
std::process::exit(1);
}
}
}
Commands::Search { query, search_type } => {
let guard = auth.lock().await;
let token = guard.get_valid_token().await?;
drop(guard);
let api = QobuzApi::new(&config);
match api.search(&token, &query, &search_type, 10, 0).await {
Ok(results) => {
println!("{}", serde_json::to_string_pretty(&results)?);
}
Err(e) => {
error!("Search failed: {}", e);
std::process::exit(1);
}
}
}
Commands::Album { album_id } => {
let guard = auth.lock().await;
let token = guard.get_valid_token().await?;
drop(guard);
let api = QobuzApi::new(&config);
match api.get_album(&token, &album_id).await {
Ok(album) => {
println!("Album: {}", album.title);
if let Some(artists) = &album.artists {
if let Some(a) = artists.first() {
println!("Artist: {}", a.name);
}
}
println!("Tracks: {}", album.track_count.unwrap_or(0));
}
Err(e) => {
error!("Failed: {}", e);
std::process::exit(1);
}
}
}
Commands::Stream {
track_id,
format_id,
} => {
let guard = auth.lock().await;
let token = guard.get_valid_token().await?;
drop(guard);
let api = QobuzApi::new(&config);
match api.get_track_url(&token, &track_id, format_id).await {
Ok(url) => println!("Stream URL: {}", url),
Err(e) => {
error!("Failed: {}", e);
std::process::exit(1);
}
}
}
Commands::Serve => {
let guard = auth.lock().await;
let token = match guard.get_valid_token().await {
Ok(t) => t,
Err(e) => {
error!("Not logged in: {}", e);
println!("Run 'qobuzd login' first.");
std::process::exit(1);
}
};
drop(guard);
let device_id = config.device_id.clone();
let device_name = config.device_name.clone();
println!("Starting QobuzD as '{}'...", device_name);
let mut qconnect = QConnect::start(token, device_id, device_name);
println!("QobuzD is running. Select it in the Qobuz app to play music.");
println!("Press Ctrl+C to stop.");
// Just forward commands to stdout for visibility
tokio::spawn(async move {
loop {
if let Some(cmd) = qconnect.poll_command() {
info!("Command received: {:?}", cmd);
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
});
tokio::signal::ctrl_c().await?;
println!("\nStopped.");
}
}
Ok(())
}

468
src/player.rs Normal file
View File

@@ -0,0 +1,468 @@
use std::io::{self, Read, Seek, SeekFrom};
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering};
use std::sync::Arc;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use symphonia::core::audio::SampleBuffer;
use symphonia::core::codecs::DecoderOptions;
use symphonia::core::formats::FormatOptions;
use symphonia::core::io::{MediaSource, MediaSourceStream};
use symphonia::core::meta::MetadataOptions;
use symphonia::core::probe::Hint;
use tokio::sync::mpsc;
use tracing::{error, info, warn};
#[derive(Debug)]
pub enum PlayerCommand {
Play {
url: String,
track_id: i32,
queue_item_id: i32,
duration_ms: u64,
},
Resume,
Pause,
Stop,
SetVolume(u8),
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum PlayerState {
Stopped,
Playing,
Paused,
}
pub struct PlayerStatus {
pub state: PlayerState,
pub position_ms: u64,
pub duration_ms: u64,
pub track_id: i32,
pub queue_item_id: i32,
pub volume: u8,
}
struct SharedState {
playing: AtomicBool,
paused: AtomicBool,
stop_signal: AtomicBool,
generation: AtomicU64, // incremented on each Play, used to avoid old threads clobbering state
position_ms: AtomicU64,
duration_ms: AtomicU64,
volume: AtomicU8,
track_id: AtomicI32,
queue_item_id: AtomicI32,
}
pub struct AudioPlayer {
cmd_tx: mpsc::UnboundedSender<PlayerCommand>,
shared: Arc<SharedState>,
}
impl AudioPlayer {
pub fn new() -> Self {
let shared = Arc::new(SharedState {
playing: AtomicBool::new(false),
paused: AtomicBool::new(false),
stop_signal: AtomicBool::new(false),
generation: AtomicU64::new(0),
position_ms: AtomicU64::new(0),
duration_ms: AtomicU64::new(0),
volume: AtomicU8::new(100),
track_id: AtomicI32::new(0),
queue_item_id: AtomicI32::new(0),
});
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<PlayerCommand>();
let shared_clone = shared.clone();
std::thread::spawn(move || {
player_thread(cmd_rx, shared_clone);
});
Self { cmd_tx, shared }
}
pub fn send(&self, cmd: PlayerCommand) {
let _ = self.cmd_tx.send(cmd);
}
pub fn status(&self) -> PlayerStatus {
let playing = self.shared.playing.load(Ordering::Relaxed);
let paused = self.shared.paused.load(Ordering::Relaxed);
let state = if playing && !paused {
PlayerState::Playing
} else if playing && paused {
PlayerState::Paused
} else {
PlayerState::Stopped
};
PlayerStatus {
state,
position_ms: self.shared.position_ms.load(Ordering::Relaxed),
duration_ms: self.shared.duration_ms.load(Ordering::Relaxed),
track_id: self.shared.track_id.load(Ordering::Relaxed),
queue_item_id: self.shared.queue_item_id.load(Ordering::Relaxed),
volume: self.shared.volume.load(Ordering::Relaxed),
}
}
}
fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc<SharedState>) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build tokio runtime for player");
loop {
let cmd = match rt.block_on(cmd_rx.recv()) {
Some(c) => c,
None => break,
};
match cmd {
PlayerCommand::Play {
url,
track_id,
queue_item_id,
duration_ms,
} => {
// Stop any current playback
shared.stop_signal.store(true, Ordering::SeqCst);
std::thread::sleep(std::time::Duration::from_millis(100));
shared.stop_signal.store(false, Ordering::SeqCst);
let gen = shared.generation.fetch_add(1, Ordering::SeqCst) + 1;
shared.paused.store(false, Ordering::SeqCst);
shared.position_ms.store(0, Ordering::SeqCst);
shared.duration_ms.store(duration_ms, Ordering::SeqCst);
shared.track_id.store(track_id, Ordering::SeqCst);
shared.queue_item_id.store(queue_item_id, Ordering::SeqCst);
shared.playing.store(true, Ordering::SeqCst);
let shared_play = shared.clone();
std::thread::spawn(move || {
if let Err(e) = play_stream(&url, shared_play, gen) {
error!("Playback error: {}", e);
}
});
}
PlayerCommand::Pause => {
shared.paused.store(true, Ordering::SeqCst);
}
PlayerCommand::Resume => {
shared.paused.store(false, Ordering::SeqCst);
}
PlayerCommand::Stop => {
shared.stop_signal.store(true, Ordering::SeqCst);
std::thread::sleep(std::time::Duration::from_millis(100));
shared.stop_signal.store(false, Ordering::SeqCst);
shared.playing.store(false, Ordering::SeqCst);
shared.paused.store(false, Ordering::SeqCst);
shared.track_id.store(0, Ordering::SeqCst);
shared.queue_item_id.store(0, Ordering::SeqCst);
}
PlayerCommand::SetVolume(vol) => {
shared.volume.store(vol, Ordering::SeqCst);
}
}
}
}
// ---------------------------------------------------------------------------
// HTTP streaming source (streams from network, buffers first 512KB for seeks)
// ---------------------------------------------------------------------------
const HEAD_SIZE: usize = 512 * 1024;
struct HttpStreamSource {
reader: reqwest::blocking::Response,
head: Vec<u8>,
reader_pos: u64,
pos: u64,
content_length: Option<u64>,
}
impl HttpStreamSource {
fn new(response: reqwest::blocking::Response, content_length: Option<u64>) -> Self {
Self {
reader: response,
head: Vec::new(),
reader_pos: 0,
pos: 0,
content_length,
}
}
}
impl Read for HttpStreamSource {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let pos = self.pos as usize;
if pos < self.head.len() {
let avail = self.head.len() - pos;
let n = buf.len().min(avail);
buf[..n].copy_from_slice(&self.head[pos..pos + n]);
self.pos += n as u64;
return Ok(n);
}
let n = self.reader.read(buf)?;
if n > 0 {
if self.reader_pos < HEAD_SIZE as u64 {
let capacity: usize = HEAD_SIZE.saturating_sub(self.head.len());
let to_buf = n.min(capacity);
if to_buf > 0 {
self.head.extend_from_slice(&buf[..to_buf]);
}
}
self.reader_pos += n as u64;
self.pos += n as u64;
}
Ok(n)
}
}
impl Seek for HttpStreamSource {
fn seek(&mut self, from: SeekFrom) -> io::Result<u64> {
let cl = self.content_length.unwrap_or(u64::MAX);
let target: u64 = match from {
SeekFrom::Start(n) => n,
SeekFrom::End(n) if n < 0 => cl.saturating_sub((-n) as u64),
SeekFrom::End(_) => cl,
SeekFrom::Current(n) if n >= 0 => self.pos.saturating_add(n as u64),
SeekFrom::Current(n) => self.pos.saturating_sub((-n) as u64),
};
if target == self.pos {
return Ok(self.pos);
}
if target < self.reader_pos {
if target < self.head.len() as u64 {
self.pos = target;
return Ok(self.pos);
}
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"backward seek past head buffer",
));
}
// Forward seek: read and discard
let mut remaining = target - self.reader_pos;
while remaining > 0 {
let mut discard = [0u8; 8192];
let want = (remaining as usize).min(discard.len());
match self.reader.read(&mut discard[..want]) {
Ok(0) => break,
Ok(n) => {
if self.reader_pos < HEAD_SIZE as u64 {
let capacity: usize = HEAD_SIZE.saturating_sub(self.head.len());
let to_buf = n.min(capacity);
if to_buf > 0 {
self.head.extend_from_slice(&discard[..to_buf]);
}
}
self.reader_pos += n as u64;
remaining -= n as u64;
}
Err(e) => return Err(e),
}
}
self.pos = self.reader_pos;
Ok(self.pos)
}
}
impl MediaSource for HttpStreamSource {
fn is_seekable(&self) -> bool {
true
}
fn byte_len(&self) -> Option<u64> {
self.content_length
}
}
// ---------------------------------------------------------------------------
// Streaming playback
// ---------------------------------------------------------------------------
fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::Result<()> {
info!("Streaming audio...");
let response = reqwest::blocking::get(url)?;
let content_length = response.content_length();
let source = HttpStreamSource::new(response, content_length);
let mss = MediaSourceStream::new(Box::new(source), Default::default());
let hint = Hint::new();
let probed = symphonia::default::get_probe().format(
&hint,
mss,
&FormatOptions::default(),
&MetadataOptions::default(),
)?;
let mut format = probed.format;
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL)
.ok_or_else(|| anyhow::anyhow!("no audio track"))?
.clone();
let track_id = track.id;
let sample_rate = track.codec_params.sample_rate.unwrap_or(44100);
let channels = track.codec_params.channels.map(|c| c.count()).unwrap_or(2);
// Update duration from codec if available (and not already set from API)
if shared.duration_ms.load(Ordering::Relaxed) == 0 {
if let Some(n_frames) = track.codec_params.n_frames {
let dur_ms = (n_frames as f64 / sample_rate as f64 * 1000.0) as u64;
shared.duration_ms.store(dur_ms, Ordering::SeqCst);
}
}
let mut decoder =
symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?;
// Set up cpal output
let host = cpal::default_host();
let device = host
.default_output_device()
.ok_or_else(|| anyhow::anyhow!("no audio output device"))?;
info!("Audio output: {}", device.name().unwrap_or_default());
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(32);
let config = cpal::StreamConfig {
channels: channels as u16,
sample_rate: cpal::SampleRate(sample_rate),
buffer_size: cpal::BufferSize::Default,
};
let shared_out = shared.clone();
let mut ring_buf: Vec<f32> = Vec::new();
let mut ring_pos = 0;
let stream = device.build_output_stream(
&config,
move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0;
let paused = shared_out.paused.load(Ordering::Relaxed);
for sample in out.iter_mut() {
if paused {
*sample = 0.0;
continue;
}
if ring_pos >= ring_buf.len() {
match sample_rx.try_recv() {
Ok(buf) => {
ring_buf = buf;
ring_pos = 0;
}
Err(_) => {
*sample = 0.0;
continue;
}
}
}
*sample = ring_buf[ring_pos] * vol;
ring_pos += 1;
}
},
|err| error!("cpal error: {}", err),
None,
)?;
stream.play()?;
info!("Playback started ({}Hz, {}ch)", sample_rate, channels);
loop {
// Check if superseded by a newer Play command (generation changed)
if shared.generation.load(Ordering::SeqCst) != generation {
info!("Playback superseded by newer generation");
break;
}
if shared.stop_signal.load(Ordering::Relaxed) {
info!("Playback stopped by signal");
break;
}
while shared.paused.load(Ordering::Relaxed) {
std::thread::sleep(std::time::Duration::from_millis(50));
if shared.stop_signal.load(Ordering::Relaxed)
|| shared.generation.load(Ordering::SeqCst) != generation
{
break;
}
}
if shared.stop_signal.load(Ordering::Relaxed)
|| shared.generation.load(Ordering::SeqCst) != generation
{
break;
}
let packet = match format.next_packet() {
Ok(p) => p,
Err(symphonia::core::errors::Error::IoError(ref e))
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
{
info!("Playback finished (gen={})", generation);
break;
}
Err(symphonia::core::errors::Error::ResetRequired) => {
decoder.reset();
continue;
}
Err(e) => {
warn!("Packet error: {}", e);
break;
}
};
if packet.track_id() != track_id {
continue;
}
// Update position from packet timestamp (ts is in codec timebase units = samples)
let pos_ms = (packet.ts() as f64 / sample_rate as f64 * 1000.0) as u64;
shared.position_ms.store(pos_ms, Ordering::Relaxed);
let decoded = match decoder.decode(&packet) {
Ok(d) => d,
Err(symphonia::core::errors::Error::DecodeError(e)) => {
warn!("Decode error: {}", e);
continue;
}
Err(e) => {
warn!("Decode error: {}", e);
break;
}
};
let spec = *decoded.spec();
let n_frames = decoded.frames();
let mut sample_buf = SampleBuffer::<f32>::new(n_frames as u64, spec);
sample_buf.copy_interleaved_ref(decoded);
if sample_tx.send(sample_buf.samples().to_vec()).is_err() {
break;
}
}
// Let audio buffer drain
std::thread::sleep(std::time::Duration::from_millis(300));
drop(stream);
// Only clear playing state if we're still the current generation
// (if generation changed, a new Play command has taken over — don't clobber its state)
if shared.generation.load(Ordering::SeqCst) == generation {
shared.playing.store(false, Ordering::SeqCst);
shared.paused.store(false, Ordering::SeqCst);
}
Ok(())
}

1148
src/qconnect.rs Normal file

File diff suppressed because it is too large Load Diff

103
src/token.rs Normal file
View File

@@ -0,0 +1,103 @@
use crate::config::{Config, DeviceLinkCredentials};
use crate::error::Result;
use crate::types::OAuthTokens;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Clone)]
pub struct TokenManager {
config: Config,
}
impl TokenManager {
pub fn new(config: Config) -> Self {
Self { config }
}
pub fn store_device_link_credentials(&self, creds: DeviceLinkCredentials) -> Result<()> {
let path = self.config.device_link_credentials_path();
let content = serde_json::to_string_pretty(&creds)?;
std::fs::write(path, content)?;
Ok(())
}
pub fn load_device_link_credentials(&self) -> Result<Option<DeviceLinkCredentials>> {
let path = self.config.device_link_credentials_path();
if path.exists() {
let content = std::fs::read_to_string(path)?;
let creds: DeviceLinkCredentials = serde_json::from_str(&content)?;
if let Some(expires_at) = creds.expires_at {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
if expires_at > now {
return Ok(Some(creds));
}
}
Ok(Some(creds))
} else {
Ok(None)
}
}
pub fn clear_device_link_credentials(&self) -> Result<()> {
let path = self.config.device_link_credentials_path();
if path.exists() {
std::fs::remove_file(path)?;
}
Ok(())
}
pub fn store_tokens(&self, tokens: &OAuthTokens) -> Result<()> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let expires_at = now + tokens.expires_in as i64;
let creds = crate::config::StoredCredentials {
access_token: tokens.access_token.clone(),
refresh_token: tokens.refresh_token.clone(),
user_id: None,
expires_at: Some(expires_at),
email: None,
};
let mut config = self.config.clone();
config.store_credentials(creds)?;
Ok(())
}
pub fn load_tokens(&self) -> Result<Option<OAuthTokens>> {
let config = crate::config::Config::load()?;
if let Some(creds) = config.credentials {
if let Some(expires_at) = creds.expires_at {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
if expires_at > now {
return Ok(Some(OAuthTokens {
token_type: "bearer".to_string(),
access_token: creds.access_token,
refresh_token: creds.refresh_token,
expires_in: (expires_at - now) as u64,
}));
}
}
}
Ok(None)
}
pub fn is_token_expired(&self) -> Result<bool> {
if let Some(tokens) = self.load_tokens()? {
Ok(tokens.expires_in == 0)
} else {
Ok(true)
}
}
}

262
src/types.rs Normal file
View File

@@ -0,0 +1,262 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OAuthTokens {
pub token_type: String,
pub access_token: String,
pub refresh_token: String,
pub expires_in: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct User {
pub id: u64,
#[serde(rename = "publicId")]
pub public_id: Option<String>,
pub email: String,
pub login: String,
#[serde(rename = "firstname")]
pub first_name: Option<String>,
#[serde(rename = "lastname")]
pub last_name: Option<String>,
#[serde(rename = "display_name")]
pub display_name: Option<String>,
#[serde(rename = "country_code")]
pub country_code: Option<String>,
#[serde(rename = "language_code")]
pub language_code: Option<String>,
pub zone: Option<String>,
pub store: Option<String>,
pub country: Option<String>,
pub avatar: Option<String>,
pub subscription: Option<Subscription>,
pub credential: Option<Credential>,
#[serde(rename = "store_features")]
pub store_features: Option<StoreFeatures>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Subscription {
pub offer: String,
pub periodicity: String,
#[serde(rename = "start_date")]
pub start_date: Option<String>,
#[serde(rename = "end_date")]
pub end_date: Option<String>,
#[serde(rename = "is_canceled")]
pub is_canceled: bool,
#[serde(rename = "household_size_max")]
pub household_size_max: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Credential {
pub id: u64,
pub label: String,
pub description: Option<String>,
pub parameters: Option<CredentialParameters>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CredentialParameters {
#[serde(rename = "lossy_streaming")]
pub lossy_streaming: Option<bool>,
#[serde(rename = "lossless_streaming")]
pub lossless_streaming: Option<bool>,
#[serde(rename = "hires_streaming")]
pub hires_streaming: Option<bool>,
#[serde(rename = "hires_purchases_streaming")]
pub hires_purchases_streaming: Option<bool>,
#[serde(rename = "mobile_streaming")]
pub mobile_streaming: Option<bool>,
#[serde(rename = "offline_streaming")]
pub offline_streaming: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoreFeatures {
pub download: bool,
pub streaming: bool,
pub editorial: bool,
pub club: bool,
pub wallet: bool,
pub weeklyq: bool,
pub autoplay: bool,
#[serde(rename = "inapp_purchase_subscripton")]
pub inapp_purchase_subscription: bool,
pub opt_in: bool,
#[serde(rename = "pre_register_opt_in")]
pub pre_register_opt_in: bool,
#[serde(rename = "pre_register_zipcode")]
pub pre_register_zipcode: bool,
#[serde(rename = "music_import")]
pub music_import: bool,
pub radio: bool,
#[serde(rename = "stream_purchase")]
pub stream_purchase: bool,
pub lyrics: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoginResponse {
pub user: User,
#[serde(rename = "oauth2")]
pub oauth: OAuthTokens,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LinkTokenRequest {
#[serde(rename = "link_action")]
pub link_action: String,
#[serde(rename = "external_device_id")]
pub external_device_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LinkTokenResponse {
pub status: String,
#[serde(rename = "link_token")]
pub link_token: Option<String>,
#[serde(rename = "link_device_id")]
pub link_device_id: Option<String>,
pub errors: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceTokenRequest {
#[serde(rename = "link_token")]
pub link_token: String,
#[serde(rename = "link_device_id")]
pub link_device_id: String,
#[serde(rename = "external_device_id")]
pub external_device_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceTokenResponse {
pub status: String,
#[serde(rename = "oauth2")]
pub oauth: Option<OAuthTokens>,
#[serde(rename = "link_action")]
pub link_action: Option<String>,
pub errors: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QwsTokenResponse {
#[serde(rename = "jwt_qws")]
pub jwt_qws: QwsToken,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QwsToken {
pub exp: i64,
pub jwt: String,
pub endpoint: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiResponse<T> {
pub status: Option<String>,
pub data: Option<T>,
pub message: Option<String>,
pub code: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorResponse {
pub message: Option<String>,
pub code: Option<u32>,
pub status: Option<String>,
pub errors: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Album {
pub id: String,
pub title: String,
pub version: Option<String>,
#[serde(rename = "track_count")]
pub track_count: Option<u32>,
pub duration: Option<u32>,
pub image: Option<AlbumImage>,
pub artists: Option<Vec<Artist>>,
pub label: Option<Label>,
pub genre: Option<Genre>,
#[serde(rename = "audio_info")]
pub audio_info: Option<AudioInfo>,
pub rights: Option<Rights>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlbumImage {
pub small: Option<String>,
pub thumbnail: Option<String>,
pub large: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Artist {
pub id: u64,
pub name: String,
pub roles: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Label {
pub id: u64,
pub name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Genre {
pub id: u64,
pub name: String,
pub path: Option<Vec<u64>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AudioInfo {
#[serde(rename = "maximum_sampling_rate")]
pub maximum_sampling_rate: Option<f64>,
#[serde(rename = "maximum_bit_depth")]
pub maximum_bit_depth: Option<u32>,
#[serde(rename = "maximum_channel_count")]
pub maximum_channel_count: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Rights {
pub purchasable: Option<bool>,
pub streamable: Option<bool>,
pub downloadable: Option<bool>,
#[serde(rename = "hires_streamable")]
pub hires_streamable: Option<bool>,
#[serde(rename = "hires_purchasable")]
pub hires_purchasable: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Track {
pub id: u64,
pub title: String,
pub performer: Option<Artist>,
pub duration: Option<u32>,
pub track_number: Option<u32>,
pub disc_number: Option<u32>,
pub artists: Option<Vec<Artist>>,
pub album: Option<Album>,
#[serde(rename = "audio_info")]
pub audio_info: Option<AudioInfo>,
pub rights: Option<Rights>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Playlist {
pub id: u64,
pub name: String,
pub description: Option<String>,
pub tracks_count: Option<u32>,
pub image: Option<AlbumImage>,
pub user: Option<User>,
}