From 44a2494c7315d653e58aebb798d42ff16bc762c7 Mon Sep 17 00:00:00 2001 From: joren Date: Thu, 2 Apr 2026 00:47:32 +0200 Subject: [PATCH] Port QConnect to self-contained prost implementation Replaces qbz path-dependencies with a standalone implementation using tokio-tungstenite and prost. Fixes queue-clear sentinel handling, pause position reporting, track-end queue advancement, and adds periodic position progress reports to keep the app progress bar in sync. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 34 +- Cargo.toml | 1 + decode_burp.py | 172 ---- src/connect.proto | 145 --- src/lib.rs | 1 + src/main.rs | 15 +- src/proto.rs | 1009 +++++++++++++++++++ src/qconnect.rs | 2455 +++++++++++++++++++++------------------------ 8 files changed, 2191 insertions(+), 1641 deletions(-) delete mode 100644 decode_burp.py delete mode 100644 src/connect.proto create mode 100644 src/proto.rs diff --git a/Cargo.lock b/Cargo.lock index 8431bb6..3301c2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1602,6 +1602,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "qobuzd" version = "0.1.0" @@ -1622,6 +1645,7 @@ dependencies = [ "hmac", "keyring", "md-5", + "prost", "rand 0.8.5", "rand_chacha 0.3.1", "reqwest", @@ -1932,7 +1956,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1993,9 +2017,9 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.28" +version = "0.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" dependencies = [ "windows-sys 0.61.2", ] @@ -2379,10 +2403,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.4.2", + "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1e63ea3..7eb899a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ sha1 = "0.10" hex = "0.4" tokio-tungstenite = { version = "0.24", features = ["native-tls", "native-tls-vendored"] } futures-util = "0.3" +prost = "0.13" symphonia = { version = "0.5", features = ["flac", "pcm", "mp3", "aac", "ogg", "wav"] } cpal = "0.15" rubato = "0.15" diff --git a/decode_burp.py b/decode_burp.py deleted file mode 100644 index 17e445b..0000000 --- a/decode_burp.py +++ /dev/null @@ -1,172 +0,0 @@ -#!/usr/bin/env python3 -"""Decode QConnect WebSocket messages from Burp capture.""" -import json, sys - -def decode_varint(data, pos): - val = 0; shift = 0 - while pos < len(data): - b = data[pos]; pos += 1 - val |= (b & 0x7F) << shift - if not (b & 0x80): return val, pos - shift += 7 - return val, pos - -def parse_fields(data): - fields = []; pos = 0 - while pos < len(data): - if pos >= len(data): break - tag, pos = decode_varint(data, pos) - fnum = tag >> 3; wt = tag & 7 - if wt == 0: - val, pos = decode_varint(data, pos) - fields.append((fnum, wt, val.to_bytes(8, 'little'))) - elif wt == 1: - fields.append((fnum, wt, data[pos:pos+8])); pos += 8 - elif wt == 2: - ln, pos = decode_varint(data, pos) - fields.append((fnum, wt, data[pos:pos+ln])); pos += ln - elif wt == 5: - fields.append((fnum, wt, data[pos:pos+4])); pos += 4 - else: break - return fields - -def get_varint(fields, num): - for f, w, d in fields: - if f == num and w == 0: - return int.from_bytes(d[:8], 'little') - return None - -def get_bytes(fields, num): - for f, w, d in fields: - if f == num and w == 2: return d - return None - -def get_fixed32(fields, num): - for f, w, d in fields: - if f == num and w == 5: return int.from_bytes(d[:4], 'little') - return None - -def get_fixed64(fields, num): - for f, w, d in fields: - if f == num and w == 1: return int.from_bytes(d[:8], 'little') - return None - -MSG_NAMES = { - 1: "ERROR", 21: "RNDR_JOIN", 23: "STATE_UPDATED", 25: "VOLUME_CHANGED", - 28: "QUALITY_CHANGED", 29: "VOLUME_MUTED", 41: "SET_STATE", 42: "SET_VOLUME", - 43: "SET_ACTIVE", 44: "SET_QUALITY", 45: "SET_LOOP", 46: "SET_SHUFFLE", - 47: "MUTE_VOLUME", 61: "CTRL_JOIN", 66: "QUEUE_REQ", 67: "QUEUE_REQ2", - 75: "QUEUE_UPDATE", 76: "SESSION_SETUP", 77: "ACK_JOIN", 79: "TRACK_LIST", - 82: "SRV_STATE_ECHO", 83: "DEVICE_LIST", 86: "SRV_VOL_ECHO", - 87: "SRV_VOL_ECHO2", 90: "SRV_QUEUE_INFO", 91: "SRV_QUEUE_RESP", - 97: "SRV_ACK", 98: "SRV_ACK2", 99: "SRV_QUALITY_ECHO", - 100: "SRV_QUALITY_INFO", 103: "SRV_TRACKLIST_RESP" -} - -def decode_renderer_state(data): - """Decode RendererState proto""" - f = parse_fields(data) - ps = get_varint(f, 1) - bs = get_varint(f, 2) - pos_data = get_bytes(f, 3) - dur = get_varint(f, 4) - qi = get_varint(f, 6) - nqi = get_varint(f, 7) - - pos_ms = None; ts = None - if pos_data: - pf = parse_fields(pos_data) - ts = get_fixed64(pf, 1) - pos_ms = get_varint(pf, 2) - - return f"playing={ps} buffer={bs} pos={pos_ms}ms dur={dur}ms qi={qi} nqi={nqi}" - -def decode_set_state(data): - """Decode SET_STATE (type 41) payload""" - f = parse_fields(data) - ps = get_varint(f, 1) - pos = get_varint(f, 2) - qv = get_bytes(f, 3) - ct = get_bytes(f, 4) - nt = get_bytes(f, 5) - - qv_str = "" - if qv: - qvf = parse_fields(qv) - qv_str = f" qver={get_varint(qvf, 1)}.{get_varint(qvf, 2)}" - - ct_str = "" - if ct: - ctf = parse_fields(ct) - ct_str = f" cur_track(qi={get_varint(ctf, 1)} tid={get_fixed32(ctf, 2)})" - - nt_str = "" - if nt: - ntf = parse_fields(nt) - nt_str = f" next_track(qi={get_varint(ntf, 1)} tid={get_fixed32(ntf, 2)})" - - return f"playing_state={'None' if ps is None else ps} pos={pos}{qv_str}{ct_str}{nt_str}" - -def process_message(payload_str, direction): - data = payload_str.encode('latin-1') - if len(data) < 2: return - - # Decode frame layer - pos = 0 - while pos < len(data): - if pos >= len(data): break - ft = data[pos]; pos += 1 - flen, pos = decode_varint(data, pos) - if pos + flen > len(data): break - fbody = data[pos:pos+flen]; pos += flen - - if ft != 6: continue # only data frames - - # Parse frame body fields - ff = parse_fields(fbody) - f7 = get_bytes(ff, 7) - if not f7: continue - - # Inside f7, field 3 = QConnect message - cf = parse_fields(f7) - for cfnum, cwt, cdata in cf: - if cfnum != 3 or cwt != 2: continue - mf = parse_fields(cdata) - mt = get_varint(mf, 1) - if mt is None: continue - - name = MSG_NAMES.get(mt, f"TYPE_{mt}") - payload = get_bytes(mf, mt) - - extra = "" - if mt == 23 and payload: # STATE_UPDATED - # Unwrap field 1 (RendererState wrapper) - sf = parse_fields(payload) - state_data = get_bytes(sf, 1) - if state_data: - extra = " " + decode_renderer_state(state_data) - else: - extra = " " + decode_renderer_state(payload) - elif mt == 41 and payload: # SET_STATE - extra = " " + decode_set_state(payload) - elif mt == 82 and payload: # SRV_STATE_ECHO - sf = parse_fields(payload) - state_data = get_bytes(sf, 1) - if state_data: - extra = " " + decode_renderer_state(state_data) - - arrow = ">>>" if "CLIENT" in direction else "<<<" - print(f" {arrow} {name}({mt}){extra}") - -# Read from burp JSON -import subprocess -# Just process messages passed on stdin -for line in sys.stdin: - line = line.strip() - if not line: continue - try: - msg = json.loads(line) - d = msg.get('direction', '') - p = msg.get('payload', '') - process_message(p, d) - except: pass diff --git a/src/connect.proto b/src/connect.proto deleted file mode 100644 index 2965230..0000000 --- a/src/connect.proto +++ /dev/null @@ -1,145 +0,0 @@ -syntax = "proto3"; - -package qobuz.connect; - -message DeviceInfo { - string device_id = 1; - string device_name = 2; - string device_type = 3; - string firmware_version = 4; - string ip_address = 5; - int32 port = 6; - Capabilities capabilities = 7; -} - -message Capabilities { - bool supports_video = 1; - bool supports_audio = 2; - bool supports_image = 3; - repeated string supported_formats = 4; -} - -message ControlMessage { - string message_id = 1; - MessageType type = 2; - oneof payload { - PlayRequest play = 10; - PauseRequest pause = 11; - StopRequest stop = 12; - SeekRequest seek = 13; - VolumeRequest volume = 14; - GetStatusRequest get_status = 15; - LoadRequest load = 16; - } -} - -enum MessageType { - UNKNOWN = 0; - PLAY = 1; - PAUSE = 2; - STOP = 3; - SEEK = 4; - VOLUME = 5; - GET_STATUS = 6; - LOAD = 7; - STATUS = 100; - ERROR = 101; - CONNECTED = 102; - DISCONNECTED = 103; -} - -message PlayRequest { - string track_url = 1; - int64 position_ms = 2; -} - -message PauseRequest {} - -message StopRequest {} - -message SeekRequest { - int64 position_ms = 1; -} - -message VolumeRequest { - int32 volume = 1; // 0-100 - bool mute = 2; -} - -message GetStatusRequest {} - -message LoadRequest { - string track_id = 1; - string album_id = 2; - int32 format_id = 3; - int64 position_ms = 4; -} - -message ControlResponse { - string message_id = 1; - MessageType type = 2; - bool success = 3; - string error_message = 4; - oneof payload { - StatusResponse status = 10; - } -} - -message StatusResponse { - PlaybackState state = 1; - string track_id = 2; - string album_id = 3; - string track_url = 4; - int64 position_ms = 5; - int64 duration_ms = 6; - int32 volume = 7; - bool muted = 8; - TrackInfo track_info = 9; -} - -enum PlaybackState { - IDLE = 0; - LOADING = 1; - PLAYING = 2; - PAUSED = 3; - BUFFERING = 4; - ERROR = 5; -} - -message TrackInfo { - string title = 1; - string artist = 2; - string album = 3; - string album_artist = 4; - int32 track_number = 5; - int32 disc_number = 6; - int64 duration_ms = 7; - string artwork_url = 8; - string format = 9; - int32 bit_depth = 10; - int32 sample_rate = 11; -} - -message LinkRequest { - string device_id = 1; - string device_name = 2; - string device_type = 3; -} - -message LinkResponse { - bool success = 1; - string link_token = 2; - string error_message = 3; -} - -message StreamRequest { - string track_id = 1; - int32 format_id = 2; - int64 position_ms = 3; -} - -message StreamResponse { - bool success = 1; - string stream_url = 2; - string error_message = 3; -} diff --git a/src/lib.rs b/src/lib.rs index 775a31a..fd77650 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod config; pub mod crypto; pub mod error; pub mod player; +pub mod proto; pub mod qconnect; pub mod token; pub mod types; diff --git a/src/main.rs b/src/main.rs index 152f881..824d159 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{error, info, Level}; +use tracing::{error, Level}; use tracing_subscriber::FmtSubscriber; use qobuzd::api::{QobuzApi, TrackStream}; @@ -229,20 +229,13 @@ async fn main() -> Result<()> { println!("Starting QobuzD as '{}'...", device_name); - let mut qconnect = QConnect::start(token, device_id, device_name); + let qconnect = QConnect::start(token, device_id, device_name); println!("QobuzD is running. Select it in the Qobuz app to play music."); println!("Press Ctrl+C to stop."); - // Just forward commands to stdout for visibility - tokio::spawn(async move { - loop { - if let Some(cmd) = qconnect.poll_command() { - info!("Command received: {:?}", cmd); - } - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } - }); + // QConnect handles commands internally; nothing to poll. + let _ = qconnect; tokio::signal::ctrl_c().await?; println!("\nStopped."); diff --git a/src/proto.rs b/src/proto.rs new file mode 100644 index 0000000..4c8542c --- /dev/null +++ b/src/proto.rs @@ -0,0 +1,1009 @@ +#[derive(Clone, Copy, PartialEq, Eq, Debug, ::prost::Enumeration)] +#[repr(i32)] +pub enum QConnectMessageType { + MessageTypeRndrSrvrJoinSession = 21, + MessageTypeRndrSrvrDeviceInfoUpdated = 22, + MessageTypeRndrSrvrStateUpdated = 23, + MessageTypeRndrSrvrRendererAction = 24, + MessageTypeRndrSrvrVolumeChanged = 25, + MessageTypeRndrSrvrFileAudioQualityChanged = 26, + MessageTypeRndrSrvrDeviceAudioQualityChanged = 27, + MessageTypeRndrSrvrMaxAudioQualityChanged = 28, + MessageTypeRndrSrvrVolumeMuted = 29, + MessageTypeSrvrRndrSetState = 41, + MessageTypeSrvrRndrSetVolume = 42, + MessageTypeSrvrRndrSetActive = 43, + MessageTypeSrvrRndrSetMaxAudioQuality = 44, + MessageTypeSrvrRndrSetLoopMode = 45, + MessageTypeSrvrRndrSetShuffleMode = 46, + MessageTypeSrvrRndrMuteVolume = 47, + MessageTypeCtrlSrvrJoinSession = 61, + MessageTypeCtrlSrvrSetPlayerState = 62, + MessageTypeCtrlSrvrSetActiveRenderer = 63, + MessageTypeCtrlSrvrSetVolume = 64, + MessageTypeCtrlSrvrClearQueue = 65, + MessageTypeCtrlSrvrQueueLoadTracks = 66, + MessageTypeCtrlSrvrQueueInsertTracks = 67, + MessageTypeCtrlSrvrQueueAddTracks = 68, + MessageTypeCtrlSrvrQueueRemoveTracks = 69, + MessageTypeCtrlSrvrQueueReorderTracks = 70, + MessageTypeCtrlSrvrSetShuffleMode = 71, + MessageTypeCtrlSrvrSetLoopMode = 72, + MessageTypeCtrlSrvrMuteVolume = 73, + MessageTypeCtrlSrvrSetMaxAudioQuality = 74, + MessageTypeCtrlSrvrSetQueueState = 75, + MessageTypeCtrlSrvrAskForQueueState = 76, + MessageTypeCtrlSrvrAskForRendererState = 77, + MessageTypeCtrlSrvrSetAutoplayMode = 78, + MessageTypeCtrlSrvrAutoplayLoadTracks = 79, + MessageTypeCtrlSrvrAutoplayRemoveTracks = 80, + MessageTypeSrvrCtrlSessionState = 81, + MessageTypeSrvrCtrlRendererStateUpdated = 82, + MessageTypeSrvrCtrlAddRenderer = 83, + MessageTypeSrvrCtrlUpdateRenderer = 84, + MessageTypeSrvrCtrlRemoveRenderer = 85, + MessageTypeSrvrCtrlActiveRendererChanged = 86, + MessageTypeSrvrCtrlVolumeChanged = 87, + MessageTypeSrvrCtrlQueueErrorMessage = 88, + MessageTypeSrvrCtrlQueueCleared = 89, + MessageTypeSrvrCtrlQueueState = 90, + MessageTypeSrvrCtrlQueueTracksLoaded = 91, + MessageTypeSrvrCtrlQueueTracksInserted = 92, + MessageTypeSrvrCtrlQueueTracksAdded = 93, + MessageTypeSrvrCtrlQueueTracksRemoved = 94, + MessageTypeSrvrCtrlQueueTracksReordered = 95, + MessageTypeSrvrCtrlShuffleModeSet = 96, + MessageTypeSrvrCtrlLoopModeSet = 97, + MessageTypeSrvrCtrlVolumeMuted = 98, + MessageTypeSrvrCtrlMaxAudioQualityChanged = 99, + MessageTypeSrvrCtrlFileAudioQualityChanged = 100, + MessageTypeSrvrCtrlDeviceAudioQualityChanged = 101, + MessageTypeSrvrCtrlAutoplayModeSet = 102, + MessageTypeSrvrCtrlAutoplayTracksLoaded = 103, + MessageTypeSrvrCtrlAutoplayTracksRemoved = 104, + MessageTypeSrvrCtrlQueueTracksAddedFromAutoplay = 105, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueVersionRef { + #[prost(int32, optional, tag = "1")] + pub major: Option, + #[prost(int32, optional, tag = "2")] + pub minor: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ErrorMessage { + /// Server sends error codes as strings (e.g. "ERROR_QUEUE_INSERT_TRACKS"), + /// not integers. Wire type = LengthDelimited (string). + #[prost(string, optional, tag = "1")] + pub code: Option, + #[prost(string, optional, tag = "2")] + pub message: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueTrack { + #[prost(int32, optional, tag = "1")] + pub queue_item_id: Option, + #[prost(fixed32, optional, tag = "2")] + pub track_id: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueTrackWithContext { + #[prost(int32, optional, tag = "1")] + pub queue_item_id: Option, + #[prost(fixed32, optional, tag = "2")] + pub track_id: Option, + #[prost(bytes = "vec", optional, tag = "3")] + pub context_uuid: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetQueueTrackWithContext { + #[prost(int32, optional, tag = "1")] + pub track_id: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub context_uuid: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeviceCapabilitiesMessage { + #[prost(int32, optional, tag = "1")] + pub min_audio_quality: Option, + #[prost(int32, optional, tag = "2")] + pub max_audio_quality: Option, + #[prost(int32, optional, tag = "3")] + pub volume_remote_control: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeviceInfoMessage { + #[prost(bytes = "vec", optional, tag = "1")] + pub device_uuid: Option>, + #[prost(string, optional, tag = "2")] + pub friendly_name: Option, + #[prost(string, optional, tag = "3")] + pub brand: Option, + #[prost(string, optional, tag = "4")] + pub model: Option, + #[prost(string, optional, tag = "5")] + pub serial_number: Option, + #[prost(int32, optional, tag = "6")] + pub device_type: Option, + #[prost(message, optional, tag = "7")] + pub capabilities: Option, + #[prost(string, optional, tag = "8")] + pub software_version: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JoinSessionMessage { + #[prost(bytes = "vec", optional, tag = "1")] + pub session_uuid: Option>, + #[prost(message, optional, tag = "2")] + pub device_info: Option, + /// Renderer-only: reason for joining (0=unknown, 1=controller_request, 2=reconnection) + #[prost(int32, optional, tag = "3")] + pub reason: Option, + /// Renderer-only: initial playback state sent on join + #[prost(message, optional, tag = "4")] + pub initial_state: Option, + /// Renderer-only: whether this renderer is active + #[prost(bool, optional, tag = "5")] + pub is_active: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetPlayerStateQueueItemMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(int32, optional, tag = "2")] + pub id: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetPlayerStateMessage { + #[prost(int32, optional, tag = "1")] + pub playing_state: Option, + #[prost(int32, optional, tag = "2")] + pub current_position: Option, + #[prost(message, optional, tag = "3")] + pub current_queue_item: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetActiveRendererMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetVolumeMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(int32, optional, tag = "2")] + pub volume: Option, + #[prost(int32, optional, tag = "3")] + pub volume_delta: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetLoopModeMessage { + #[prost(int32, optional, tag = "1")] + pub loop_mode: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MuteVolumeMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(bool, optional, tag = "2")] + pub value: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetMaxAudioQualityMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(int32, optional, tag = "2")] + pub max_audio_quality: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AskForRendererStateMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ClearQueueMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueLoadTracksMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(fixed32, repeated, tag = "3")] + pub track_ids: Vec, + #[prost(int32, optional, tag = "4")] + pub queue_position: Option, + #[prost(fixed32, optional, tag = "5")] + pub shuffle_seed: Option, + #[prost(int32, optional, tag = "6")] + pub shuffle_pivot_index: Option, + #[prost(bool, optional, tag = "7")] + pub shuffle_mode: Option, + #[prost(bytes = "vec", optional, tag = "8")] + pub context_uuid: Option>, + #[prost(bool, optional, tag = "9")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "10")] + pub autoplay_loading: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueInsertTracksMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(fixed32, repeated, tag = "3")] + pub track_ids: Vec, + #[prost(int32, optional, tag = "4")] + pub insert_after: Option, + #[prost(fixed32, optional, tag = "5")] + pub shuffle_seed: Option, + #[prost(bytes = "vec", optional, tag = "6")] + pub context_uuid: Option>, + #[prost(bool, optional, tag = "7")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "8")] + pub autoplay_loading: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueAddTracksMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(fixed32, repeated, tag = "3")] + pub track_ids: Vec, + #[prost(fixed32, optional, tag = "4")] + pub shuffle_seed: Option, + #[prost(bytes = "vec", optional, tag = "5")] + pub context_uuid: Option>, + #[prost(bool, optional, tag = "6")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "7")] + pub autoplay_loading: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueRemoveTracksMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(int32, repeated, packed = "false", tag = "3")] + pub queue_item_ids: Vec, + #[prost(bool, optional, tag = "4")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "5")] + pub autoplay_loading: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueReorderTracksMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(int32, repeated, packed = "false", tag = "3")] + pub queue_item_ids: Vec, + #[prost(int32, optional, tag = "4")] + pub insert_after: Option, + #[prost(bool, optional, tag = "5")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "6")] + pub autoplay_loading: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetShuffleModeMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(bool, optional, tag = "3")] + pub shuffle_mode: Option, + #[prost(fixed32, optional, tag = "4")] + pub shuffle_seed: Option, + #[prost(int32, optional, tag = "5")] + pub shuffle_pivot_queue_item_id: Option, + #[prost(bool, optional, tag = "6")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "7")] + pub autoplay_loading: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetAutoplayModeMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(bool, optional, tag = "3")] + pub autoplay_mode: Option, + #[prost(bool, optional, tag = "4")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "5")] + pub autoplay_loading: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AutoplayLoadTracksMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(fixed32, repeated, tag = "3")] + pub track_ids: Vec, + #[prost(bytes = "vec", optional, tag = "4")] + pub context_uuid: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AutoplayRemoveTracksMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(int32, repeated, packed = "false", tag = "3")] + pub queue_item_ids: Vec, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetQueueStateMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(message, repeated, tag = "3")] + pub tracks: Vec, + #[prost(bool, optional, tag = "4")] + pub shuffle_mode: Option, + #[prost(int32, repeated, packed = "false", tag = "5")] + pub shuffled_track_indexes: Vec, + #[prost(bool, optional, tag = "6")] + pub autoplay_mode: Option, + #[prost(bool, optional, tag = "7")] + pub autoplay_loading: Option, + #[prost(message, repeated, tag = "8")] + pub autoplay_tracks: Vec, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AskForQueueStateMessage { + #[prost(message, optional, tag = "1")] + pub queue_version_ref: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueErrorMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(message, optional, tag = "3")] + pub error: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueClearedMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueStateMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(message, repeated, tag = "3")] + pub tracks: Vec, + #[prost(bool, optional, tag = "4")] + pub shuffle_mode: Option, + #[prost(int32, repeated, packed = "false", tag = "5")] + pub shuffled_track_indexes: Vec, + #[prost(bool, optional, tag = "6")] + pub autoplay_mode: Option, + #[prost(bool, optional, tag = "7")] + pub autoplay_loading: Option, + #[prost(message, repeated, tag = "8")] + pub autoplay_tracks: Vec, + #[prost(bytes = "vec", optional, tag = "100")] + pub queue_hash: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueTracksLoadedMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(message, repeated, tag = "3")] + pub tracks: Vec, + #[prost(int32, optional, tag = "4")] + pub queue_position: Option, + #[prost(fixed32, optional, tag = "5")] + pub shuffle_seed: Option, + #[prost(int32, optional, tag = "6")] + pub shuffle_pivot_queue_item_id: Option, + #[prost(bool, optional, tag = "7")] + pub shuffle_mode: Option, + #[prost(bytes = "vec", optional, tag = "8")] + pub context_uuid: Option>, + #[prost(bool, optional, tag = "9")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "10")] + pub autoplay_loading: Option, + #[prost(bytes = "vec", optional, tag = "100")] + pub queue_hash: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueTracksInsertedMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(message, repeated, tag = "3")] + pub tracks: Vec, + #[prost(int32, optional, tag = "4")] + pub insert_after: Option, + #[prost(fixed32, optional, tag = "5")] + pub shuffle_seed: Option, + #[prost(bytes = "vec", optional, tag = "6")] + pub context_uuid: Option>, + #[prost(bool, optional, tag = "7")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "8")] + pub autoplay_loading: Option, + #[prost(bytes = "vec", optional, tag = "100")] + pub queue_hash: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueTracksAddedMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(message, repeated, tag = "3")] + pub tracks: Vec, + #[prost(fixed32, optional, tag = "4")] + pub shuffle_seed: Option, + #[prost(bytes = "vec", optional, tag = "5")] + pub context_uuid: Option>, + #[prost(bool, optional, tag = "6")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "7")] + pub autoplay_loading: Option, + #[prost(bytes = "vec", optional, tag = "100")] + pub queue_hash: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueTracksRemovedMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(int32, repeated, packed = "false", tag = "3")] + pub queue_item_ids: Vec, + #[prost(bool, optional, tag = "4")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "5")] + pub autoplay_loading: Option, + #[prost(bytes = "vec", optional, tag = "100")] + pub queue_hash: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueTracksReorderedMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(int32, repeated, packed = "false", tag = "3")] + pub queue_item_ids: Vec, + #[prost(int32, optional, tag = "4")] + pub insert_after: Option, + #[prost(bool, optional, tag = "5")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "6")] + pub autoplay_loading: Option, + #[prost(bytes = "vec", optional, tag = "100")] + pub queue_hash: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ShuffleModeSetMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(bool, optional, tag = "3")] + pub shuffle_mode: Option, + #[prost(fixed32, optional, tag = "4")] + pub shuffle_seed: Option, + #[prost(int32, optional, tag = "5")] + pub shuffle_pivot_queue_item_id: Option, + #[prost(bool, optional, tag = "6")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "7")] + pub autoplay_loading: Option, + #[prost(bytes = "vec", optional, tag = "100")] + pub queue_hash: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AutoplayModeSetMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(bool, optional, tag = "3")] + pub autoplay_mode: Option, + #[prost(bool, optional, tag = "4")] + pub autoplay_reset: Option, + #[prost(bool, optional, tag = "5")] + pub autoplay_loading: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AutoplayTracksLoadedMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(message, repeated, tag = "3")] + pub tracks: Vec, + #[prost(bytes = "vec", optional, tag = "4")] + pub context_uuid: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AutoplayTracksRemovedMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub action_uuid: Option>, + #[prost(int32, repeated, packed = "false", tag = "3")] + pub queue_item_ids: Vec, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueueTracksAddedFromAutoplayMessage { + #[prost(message, optional, tag = "1")] + pub queue_version: Option, + #[prost(int32, repeated, packed = "false", tag = "2")] + pub queue_item_ids: Vec, + #[prost(bytes = "vec", optional, tag = "100")] + pub queue_hash: Option>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererSetStateMessage { + #[prost(int32, optional, tag = "1")] + pub playing_state: Option, + #[prost(int32, optional, tag = "2")] + pub current_position: Option, + #[prost(message, optional, tag = "3")] + pub queue_version: Option, + #[prost(message, optional, tag = "4")] + pub current_track: Option, + #[prost(message, optional, tag = "5")] + pub next_track: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererSetVolumeMessage { + #[prost(int32, optional, tag = "1")] + pub volume: Option, + #[prost(int32, optional, tag = "2")] + pub volume_delta: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererSetActiveMessage { + #[prost(bool, optional, tag = "1")] + pub active: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererSetMaxAudioQualityMessage { + #[prost(int32, optional, tag = "1")] + pub max_audio_quality: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererSetLoopModeMessage { + #[prost(int32, optional, tag = "1")] + pub loop_mode: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererSetShuffleModeMessage { + #[prost(bool, optional, tag = "1")] + pub shuffle_mode: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererMuteVolumeMessage { + #[prost(bool, optional, tag = "1")] + pub value: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PlaybackPositionMessage { + #[prost(fixed64, optional, tag = "1")] + pub timestamp: Option, + #[prost(int32, optional, tag = "2")] + pub value: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererStateMessage { + #[prost(int32, optional, tag = "1")] + pub playing_state: Option, + #[prost(int32, optional, tag = "2")] + pub buffer_state: Option, + #[prost(message, optional, tag = "3")] + pub current_position: Option, + #[prost(int32, optional, tag = "4")] + pub duration: Option, + #[prost(message, optional, tag = "5")] + pub queue_version: Option, + #[prost(int32, optional, tag = "6")] + pub current_queue_item_id: Option, + #[prost(int32, optional, tag = "7")] + pub next_queue_item_id: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererActionMessage { + #[prost(uint32, optional, tag = "1")] + pub seek_position: Option, + /// ActionType: 0=unknown, 1=previous, 2=next, 3=repeat_off, 4=repeat_one, + /// 5=repeat_all, 6=shuffle_off, 7=shuffle_on, 8=seek + #[prost(int32, tag = "2")] + pub action: i32, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererStateUpdatedMessage { + #[prost(message, optional, tag = "1")] + pub state: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererVolumeChangedMessage { + #[prost(int32, optional, tag = "1")] + pub volume: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererVolumeMutedMessage { + #[prost(bool, optional, tag = "1")] + pub value: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererFileAudioQualityChangedMessage { + #[prost(int32, optional, tag = "1")] + pub sampling_rate: Option, + #[prost(int32, optional, tag = "2")] + pub bit_depth: Option, + #[prost(int32, optional, tag = "3")] + pub nb_channels: Option, + #[prost(int32, optional, tag = "4")] + pub audio_quality: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RendererMaxAudioQualityChangedMessage { + #[prost(int32, optional, tag = "1")] + pub max_audio_quality: Option, + #[prost(int32, optional, tag = "2")] + pub network_type: Option, +} + +// --- SRVR_CTRL session management messages (server → controller) --- + +/// Type 81: Session state after joining +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlSessionStateMessage { + #[prost(bytes = "vec", optional, tag = "1")] + pub session_uuid: Option>, + #[prost(int32, optional, tag = "2")] + pub active_renderer_id: Option, + #[prost(message, optional, tag = "3")] + pub queue_version: Option, + #[prost(int32, optional, tag = "4")] + pub playing_state: Option, + #[prost(int32, optional, tag = "5")] + pub loop_mode: Option, +} + +/// Nested player state for CtrlRendererStateUpdatedMessage +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlRendererPlayerState { + #[prost(int32, optional, tag = "1")] + pub playing_state: Option, + #[prost(int32, optional, tag = "2")] + pub buffer_state: Option, + #[prost(message, optional, tag = "3")] + pub current_position: Option, + #[prost(uint32, optional, tag = "4")] + pub duration: Option, + #[prost(int32, optional, tag = "5")] + pub current_queue_item_id: Option, +} + +/// Type 82: Renderer state updated (controller view) +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlRendererStateUpdatedMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(int32, optional, tag = "2")] + pub status: Option, + #[prost(message, optional, tag = "3")] + pub player_state: Option, +} + +/// Type 83: New renderer added to session +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlAddRendererMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(message, optional, tag = "2")] + pub device_info: Option, +} + +/// Type 84: Renderer info updated +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlUpdateRendererMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(message, optional, tag = "2")] + pub device_info: Option, +} + +/// Type 85: Renderer removed from session +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlRemoveRendererMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, +} + +/// Type 86: Active renderer changed +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlActiveRendererChangedMessage { + #[prost(int32, optional, tag = "1")] + pub active_renderer_id: Option, +} + +/// Type 87: Volume changed on a renderer (controller view) +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlVolumeChangedMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(uint32, optional, tag = "2")] + pub volume: Option, +} + +/// Type 97: Loop mode set +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlLoopModeSetMessage { + #[prost(int32, optional, tag = "1")] + pub loop_mode: Option, +} + +/// Type 98: Volume muted (controller view) +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlVolumeMutedMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(bool, optional, tag = "2")] + pub value: Option, +} + +/// Type 99: Max audio quality changed (controller view) +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlMaxAudioQualityChangedMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(int32, optional, tag = "2")] + pub max_audio_quality: Option, + #[prost(int32, optional, tag = "3")] + pub network_type: Option, +} + +/// Type 100: File audio quality changed (controller view) +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlFileAudioQualityChangedMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(uint32, optional, tag = "2")] + pub sampling_rate: Option, + #[prost(uint32, optional, tag = "3")] + pub bit_depth: Option, + #[prost(uint32, optional, tag = "4")] + pub nb_channels: Option, + #[prost(int32, optional, tag = "5")] + pub audio_quality: Option, +} + +/// Type 101: Device audio quality changed (controller view) +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CtrlDeviceAudioQualityChangedMessage { + #[prost(int32, optional, tag = "1")] + pub renderer_id: Option, + #[prost(uint32, optional, tag = "2")] + pub sampling_rate: Option, + #[prost(uint32, optional, tag = "3")] + pub bit_depth: Option, + #[prost(uint32, optional, tag = "4")] + pub nb_channels: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QConnectMessages { + #[prost(fixed64, optional, tag = "1")] + pub messages_time: Option, + #[prost(int32, optional, tag = "2")] + pub messages_id: Option, + #[prost(message, repeated, tag = "3")] + pub messages: Vec, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QConnectMessage { + #[prost(int32, optional, tag = "1")] + pub message_type: Option, + #[prost(message, optional, tag = "21")] + pub rndr_srvr_join_session: Option, + #[prost(message, optional, tag = "22")] + pub rndr_srvr_device_info_updated: Option, + #[prost(message, optional, tag = "23")] + pub rndr_srvr_state_updated: Option, + #[prost(message, optional, tag = "24")] + pub rndr_srvr_renderer_action: Option, + #[prost(message, optional, tag = "25")] + pub rndr_srvr_volume_changed: Option, + #[prost(message, optional, tag = "26")] + pub rndr_srvr_file_audio_quality_changed: Option, + #[prost(message, optional, tag = "28")] + pub rndr_srvr_max_audio_quality_changed: Option, + #[prost(message, optional, tag = "29")] + pub rndr_srvr_volume_muted: Option, + #[prost(message, optional, tag = "41")] + pub srvr_rndr_set_state: Option, + #[prost(message, optional, tag = "42")] + pub srvr_rndr_set_volume: Option, + #[prost(message, optional, tag = "43")] + pub srvr_rndr_set_active: Option, + #[prost(message, optional, tag = "44")] + pub srvr_rndr_set_max_audio_quality: Option, + #[prost(message, optional, tag = "45")] + pub srvr_rndr_set_loop_mode: Option, + #[prost(message, optional, tag = "46")] + pub srvr_rndr_set_shuffle_mode: Option, + #[prost(message, optional, tag = "47")] + pub srvr_rndr_mute_volume: Option, + #[prost(message, optional, tag = "61")] + pub ctrl_srvr_join_session: Option, + #[prost(message, optional, tag = "62")] + pub ctrl_srvr_set_player_state: Option, + #[prost(message, optional, tag = "63")] + pub ctrl_srvr_set_active_renderer: Option, + #[prost(message, optional, tag = "64")] + pub ctrl_srvr_set_volume: Option, + #[prost(message, optional, tag = "65")] + pub ctrl_srvr_clear_queue: Option, + #[prost(message, optional, tag = "66")] + pub ctrl_srvr_queue_load_tracks: Option, + #[prost(message, optional, tag = "67")] + pub ctrl_srvr_queue_insert_tracks: Option, + #[prost(message, optional, tag = "68")] + pub ctrl_srvr_queue_add_tracks: Option, + #[prost(message, optional, tag = "69")] + pub ctrl_srvr_queue_remove_tracks: Option, + #[prost(message, optional, tag = "70")] + pub ctrl_srvr_queue_reorder_tracks: Option, + #[prost(message, optional, tag = "71")] + pub ctrl_srvr_set_shuffle_mode: Option, + #[prost(message, optional, tag = "72")] + pub ctrl_srvr_set_loop_mode: Option, + #[prost(message, optional, tag = "73")] + pub ctrl_srvr_mute_volume: Option, + #[prost(message, optional, tag = "74")] + pub ctrl_srvr_set_max_audio_quality: Option, + #[prost(message, optional, tag = "75")] + pub ctrl_srvr_set_queue_state: Option, + #[prost(message, optional, tag = "76")] + pub ctrl_srvr_ask_for_queue_state: Option, + #[prost(message, optional, tag = "77")] + pub ctrl_srvr_ask_for_renderer_state: Option, + #[prost(message, optional, tag = "78")] + pub ctrl_srvr_set_autoplay_mode: Option, + #[prost(message, optional, tag = "79")] + pub ctrl_srvr_autoplay_load_tracks: Option, + #[prost(message, optional, tag = "80")] + pub ctrl_srvr_autoplay_remove_tracks: Option, + #[prost(message, optional, tag = "81")] + pub srvr_ctrl_session_state: Option, + #[prost(message, optional, tag = "82")] + pub srvr_ctrl_renderer_state_updated: Option, + #[prost(message, optional, tag = "83")] + pub srvr_ctrl_add_renderer: Option, + #[prost(message, optional, tag = "84")] + pub srvr_ctrl_update_renderer: Option, + #[prost(message, optional, tag = "85")] + pub srvr_ctrl_remove_renderer: Option, + #[prost(message, optional, tag = "86")] + pub srvr_ctrl_active_renderer_changed: Option, + #[prost(message, optional, tag = "87")] + pub srvr_ctrl_volume_changed: Option, + #[prost(message, optional, tag = "88")] + pub srvr_ctrl_queue_error_message: Option, + #[prost(message, optional, tag = "89")] + pub srvr_ctrl_queue_cleared: Option, + #[prost(message, optional, tag = "90")] + pub srvr_ctrl_queue_state: Option, + #[prost(message, optional, tag = "91")] + pub srvr_ctrl_queue_tracks_loaded: Option, + #[prost(message, optional, tag = "92")] + pub srvr_ctrl_queue_tracks_inserted: Option, + #[prost(message, optional, tag = "93")] + pub srvr_ctrl_queue_tracks_added: Option, + #[prost(message, optional, tag = "94")] + pub srvr_ctrl_queue_tracks_removed: Option, + #[prost(message, optional, tag = "95")] + pub srvr_ctrl_queue_tracks_reordered: Option, + #[prost(message, optional, tag = "96")] + pub srvr_ctrl_shuffle_mode_set: Option, + #[prost(message, optional, tag = "97")] + pub srvr_ctrl_loop_mode_set: Option, + #[prost(message, optional, tag = "98")] + pub srvr_ctrl_volume_muted: Option, + #[prost(message, optional, tag = "99")] + pub srvr_ctrl_max_audio_quality_changed: Option, + #[prost(message, optional, tag = "100")] + pub srvr_ctrl_file_audio_quality_changed: Option, + #[prost(message, optional, tag = "101")] + pub srvr_ctrl_device_audio_quality_changed: Option, + #[prost(message, optional, tag = "102")] + pub srvr_ctrl_autoplay_mode_set: Option, + #[prost(message, optional, tag = "103")] + pub srvr_ctrl_autoplay_tracks_loaded: Option, + #[prost(message, optional, tag = "104")] + pub srvr_ctrl_autoplay_tracks_removed: Option, + #[prost(message, optional, tag = "105")] + pub srvr_ctrl_queue_tracks_added_from_autoplay: Option, +} diff --git a/src/qconnect.rs b/src/qconnect.rs index 208dd2a..4805697 100644 --- a/src/qconnect.rs +++ b/src/qconnect.rs @@ -1,362 +1,337 @@ -use std::time::{SystemTime, UNIX_EPOCH}; +use std::sync::Arc; +use std::sync::atomic::{AtomicI32, Ordering}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use anyhow::{bail, Result}; use futures_util::{SinkExt, StreamExt}; -use tokio::sync::mpsc; -use tokio_tungstenite::tungstenite::Message; +use prost::encoding::{decode_varint, encode_varint}; +use prost::Message; +use tokio::sync::{mpsc, Mutex}; +use tokio_tungstenite::{ + connect_async_with_config, + tungstenite::{protocol::WebSocketConfig, Message as WsMessage}, +}; use tracing::{debug, error, info, warn}; +use uuid::Uuid; use crate::api::{QobuzApi, TrackStream}; use crate::config::Config; use crate::player::{AudioPlayer, PlayerCommand, PlayerState, StreamSource}; +use crate::proto::*; // --------------------------------------------------------------------------- -// Protobuf helpers (hand-rolled, matching the qconnect.proto schema) +// Constants // --------------------------------------------------------------------------- -fn now_millis() -> u64 { +const MSG_TYPE_AUTHENTICATE: u8 = 1; +const MSG_TYPE_SUBSCRIBE: u8 = 2; +const MSG_TYPE_PAYLOAD: u8 = 6; + +// --------------------------------------------------------------------------- +// Wire-level prost structs (mirror qbz transport) +// --------------------------------------------------------------------------- + +#[derive(Clone, PartialEq, ::prost::Message)] +struct Authenticate { + #[prost(uint32, optional, tag = "1")] + msg_id: Option, + #[prost(uint64, optional, tag = "2")] + msg_date: Option, + #[prost(string, optional, tag = "3")] + jwt: Option, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +struct Subscribe { + #[prost(uint32, optional, tag = "1")] + msg_id: Option, + #[prost(uint64, optional, tag = "2")] + msg_date: Option, + #[prost(uint32, optional, tag = "3")] + proto: Option, + #[prost(bytes = "vec", repeated, tag = "4")] + channels: Vec>, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +struct CloudPayload { + #[prost(uint32, optional, tag = "1")] + msg_id: Option, + #[prost(uint64, optional, tag = "2")] + msg_date: Option, + #[prost(uint32, optional, tag = "3")] + proto: Option, + #[prost(bytes = "vec", optional, tag = "4")] + src: Option>, + #[prost(bytes = "vec", repeated, tag = "5")] + dests: Vec>, + #[prost(bytes = "vec", optional, tag = "7")] + payload: Option>, +} + +// --------------------------------------------------------------------------- +// Frame / wire helpers +// --------------------------------------------------------------------------- + +fn now_ms() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_millis() as u64 } -fn encode_varint(mut val: u64) -> Vec { - let mut buf = Vec::with_capacity(10); - loop { - let mut byte = (val & 0x7F) as u8; - val >>= 7; - if val != 0 { - byte |= 0x80; - } - buf.push(byte); - if val == 0 { - break; - } +fn encode_frame(msg_type: u8, payload: &[u8]) -> Vec { + let mut frame = Vec::with_capacity(1 + 10 + payload.len()); + frame.push(msg_type); + encode_varint(payload.len() as u64, &mut frame); + frame.extend_from_slice(payload); + frame +} + +fn decode_frame(data: &[u8]) -> anyhow::Result<(u8, Vec)> { + if data.is_empty() { + return Err(anyhow::anyhow!("empty qcloud frame")); } - buf -} - -fn encode_field_varint(field: u32, val: u64) -> Vec { - let tag = (field as u64) << 3; - let mut out = encode_varint(tag); - out.extend(encode_varint(val)); - out -} - -fn encode_field_bytes(field: u32, data: &[u8]) -> Vec { - let tag = ((field as u64) << 3) | 2; - let mut out = encode_varint(tag); - out.extend(encode_varint(data.len() as u64)); - out.extend_from_slice(data); - out -} - -fn encode_field_string(field: u32, s: &str) -> Vec { - encode_field_bytes(field, s.as_bytes()) -} - -fn encode_field_fixed64(field: u32, val: u64) -> Vec { - let tag = ((field as u64) << 3) | 1; - let mut out = encode_varint(tag); - out.extend_from_slice(&val.to_le_bytes()); - out -} - -fn decode_varint(data: &[u8]) -> Option<(u64, usize)> { - let mut val: u64 = 0; - let mut shift = 0; - for (i, &byte) in data.iter().enumerate() { - val |= ((byte & 0x7F) as u64) << shift; - if byte & 0x80 == 0 { - return Some((val, i + 1)); - } - shift += 7; - if shift >= 64 { - return None; - } + let msg_type = data[0]; + let mut cursor = &data[1..]; + let payload_len = decode_varint(&mut cursor) + .map_err(|e| anyhow::anyhow!("decode varint: {e}"))? as usize; + let consumed = data.len().saturating_sub(1 + cursor.len()); + let start = 1 + consumed; + if data.len() < start + payload_len { + return Err(anyhow::anyhow!( + "truncated qcloud frame: expected={}, got={}", + start + payload_len, + data.len() + )); } - None + Ok((msg_type, data[start..start + payload_len].to_vec())) } -/// Parsed protobuf field: (field_number, wire_type, raw_data). -/// For varint (wire_type 0), data is the re-encoded varint bytes. -/// For length-delimited (wire_type 2), data is the payload bytes. -/// For fixed64 (wire_type 1), data is the 8 raw bytes. -fn parse_fields(data: &[u8]) -> Vec<(u32, u8, Vec)> { - let mut fields = Vec::new(); - let mut pos = 0; - while pos < data.len() { - let (tag, n) = match decode_varint(&data[pos..]) { - Some(v) => v, - None => break, - }; - pos += n; - let field_number = (tag >> 3) as u32; - let wire_type = (tag & 0x07) as u8; +static BATCH_SEQ: AtomicI32 = AtomicI32::new(1); - match wire_type { - 0 => { - // Varint - let (val, n) = match decode_varint(&data[pos..]) { - Some(v) => v, - None => break, - }; - pos += n; - fields.push((field_number, wire_type, val.to_le_bytes().to_vec())); - } - 1 => { - // Fixed64 - if pos + 8 > data.len() { - break; - } - fields.push((field_number, wire_type, data[pos..pos + 8].to_vec())); - pos += 8; - } - 2 => { - // Length-delimited - let (len, n) = match decode_varint(&data[pos..]) { - Some(v) => v, - None => break, - }; - pos += n; - let len = len as usize; - if pos + len > data.len() { - break; - } - fields.push((field_number, wire_type, data[pos..pos + len].to_vec())); - pos += len; - } - 5 => { - // Fixed32 - if pos + 4 > data.len() { - break; - } - fields.push((field_number, wire_type, data[pos..pos + 4].to_vec())); - pos += 4; - } - _ => break, - } - } - fields +fn build_payload_frame(message: QConnectMessage, msg_id: &mut u32) -> Vec { + let batch = QConnectMessages { + messages_time: Some(now_ms()), + messages_id: Some(BATCH_SEQ.fetch_add(1, Ordering::Relaxed)), + messages: vec![message], + }; + let inner = batch.encode_to_vec(); + + *msg_id = msg_id.saturating_add(1); + let cloud = CloudPayload { + msg_id: Some(*msg_id), + msg_date: Some(now_ms()), + proto: Some(1), + src: None, + dests: Vec::new(), + payload: Some(inner), + }; + + encode_frame(MSG_TYPE_PAYLOAD, &cloud.encode_to_vec()) } -fn get_varint_field(fields: &[(u32, u8, Vec)], field_num: u32) -> Option { - for (num, wt, data) in fields { - if *num == field_num && *wt == 0 { - let mut val: u64 = 0; - for (i, &b) in data.iter().enumerate().take(8) { - val |= (b as u64) << (i * 8); - } - return Some(val); - } - } - None +fn build_auth_frame(jwt: &str, msg_id: &mut u32) -> Vec { + *msg_id = msg_id.saturating_add(1); + let auth = Authenticate { + msg_id: Some(*msg_id), + msg_date: Some(now_ms()), + jwt: Some(jwt.to_string()), + }; + encode_frame(MSG_TYPE_AUTHENTICATE, &auth.encode_to_vec()) } -fn get_fixed32_field(fields: &[(u32, u8, Vec)], field_num: u32) -> Option { - for (num, wt, data) in fields { - if *num == field_num && *wt == 5 && data.len() >= 4 { - return Some(u32::from_le_bytes([data[0], data[1], data[2], data[3]])); - } - } - None -} - -fn get_bytes_field<'a>(fields: &'a [(u32, u8, Vec)], field_num: u32) -> Option<&'a [u8]> { - for (num, wt, data) in fields { - if *num == field_num && *wt == 2 { - return Some(data.as_slice()); - } - } - None +fn build_subscribe_frame(msg_id: &mut u32) -> Vec { + *msg_id = msg_id.saturating_add(1); + let sub = Subscribe { + msg_id: Some(*msg_id), + msg_date: Some(now_ms()), + proto: Some(1), + channels: vec![vec![0x02]], + }; + encode_frame(MSG_TYPE_SUBSCRIBE, &sub.encode_to_vec()) } // --------------------------------------------------------------------------- -// WebSocket frame layer (outer transport framing, NOT protobuf) +// Outbound message builders // --------------------------------------------------------------------------- -fn build_frame(frame_type: u8, body: &[u8]) -> Vec { - let mut out = vec![frame_type]; - out.extend(encode_varint(body.len() as u64)); - out.extend_from_slice(body); - out -} - -fn decode_frame(data: &[u8], pos: &mut usize) -> Option<(u8, Vec)> { - if *pos >= data.len() { - return None; - } - let frame_type = data[*pos]; - *pos += 1; - let (len_val, n) = decode_varint(&data[*pos..])?; - *pos += n; - let len = len_val as usize; - if *pos + len > data.len() { - return None; - } - let payload = data[*pos..*pos + len].to_vec(); - *pos += len; - Some((frame_type, payload)) -} - -fn decode_all_frames(data: &[u8]) -> Vec<(u8, Vec)> { - let mut out = Vec::new(); - let mut pos = 0; - while pos < data.len() { - match decode_frame(data, &mut pos) { - Some(v) => out.push(v), - None => break, - } - } - out -} - -// --------------------------------------------------------------------------- -// Frame builders -// --------------------------------------------------------------------------- - -fn build_auth_frame(msg_id: u64, jwt: &str) -> Vec { - let mut body = encode_field_varint(1, msg_id); - body.extend(encode_field_string(3, jwt)); - build_frame(1, &body) -} - -fn build_subscribe_frame(msg_id: u64) -> Vec { - let mut body = encode_field_varint(1, msg_id); - body.extend(encode_field_varint(3, 1)); - build_frame(2, &body) -} - -fn build_payload_frame(msg_id: u64, qc_data: &[u8]) -> Vec { - let mut body = encode_field_varint(1, msg_id); - body.extend(encode_field_varint(2, now_millis())); - body.extend(encode_field_varint(3, 1)); - body.extend(encode_field_bytes(5, &[0x02])); - body.extend(encode_field_bytes(7, qc_data)); - build_frame(6, &body) -} - -// --------------------------------------------------------------------------- -// QConnect message builders -// --------------------------------------------------------------------------- - -/// Wraps a QConnect message (field 1 = message_type, field N = payload) -/// inside a field-3 container, as the protocol expects. -fn build_qconnect_message(message_type: u32, payload: &[u8]) -> Vec { - let mut inner = encode_field_varint(1, message_type as u64); - inner.extend(encode_field_bytes(message_type, payload)); - encode_field_bytes(3, &inner) -} - -fn uuid_to_bytes(uuid_str: &str) -> Vec { - uuid::Uuid::parse_str(uuid_str) +fn device_uuid_bytes(config: &Config) -> Option> { + Uuid::parse_str(&config.device_id) .map(|u| u.as_bytes().to_vec()) - .unwrap_or_else(|_| uuid_str.as_bytes().to_vec()) + .ok() } -fn build_device_info(device_uuid: &str, device_name: &str) -> Vec { - let mut out = Vec::new(); - out.extend(encode_field_bytes(1, &uuid_to_bytes(device_uuid))); // device_uuid - out.extend(encode_field_string(2, device_name)); // friendly_name - out.extend(encode_field_string(3, "QobuzD")); // brand - out.extend(encode_field_string(4, "Linux")); // model - out.extend(encode_field_string(5, device_uuid)); // serial_number - out.extend(encode_field_varint(6, 5)); // type = COMPUTER(5) - let mut caps = encode_field_varint(1, 1); - caps.extend(encode_field_varint(2, 5)); - caps.extend(encode_field_varint(3, 2)); - out.extend(encode_field_bytes(7, &caps)); - out.extend(encode_field_string(8, "qobuzd-0.1.0")); // software_version - out +fn make_device_info(config: &Config) -> DeviceInfoMessage { + DeviceInfoMessage { + device_uuid: device_uuid_bytes(config), + friendly_name: Some(config.device_name.clone()), + brand: Some("QobuzD".to_string()), + model: Some("Linux".to_string()), + serial_number: None, + device_type: Some(5), + capabilities: None, + software_version: Some("qobuzd-0.1.0".to_string()), + } } -/// CTRL_SRVR_JOIN_SESSION (61): controller asks server to create/join session. -fn msg_ctrl_join_session(device_uuid: &str, device_name: &str) -> Vec { - let device_info = build_device_info(device_uuid, device_name); - let payload = encode_field_bytes(2, &device_info); - build_qconnect_message(61, &payload) +fn build_ctrl_join_session(config: &Config) -> QConnectMessage { + QConnectMessage { + message_type: Some(QConnectMessageType::MessageTypeCtrlSrvrJoinSession as i32), + ctrl_srvr_join_session: Some(JoinSessionMessage { + session_uuid: None, + device_info: Some(make_device_info(config)), + reason: None, + initial_state: None, + is_active: None, + }), + ..Default::default() + } } -/// RNDR_SRVR_JOIN_SESSION (21): renderer joins an existing session. -fn msg_renderer_join_session(device_uuid: &str, device_name: &str, session_uuid: &[u8]) -> Vec { - let device_info = build_device_info(device_uuid, device_name); - let initial_state = build_renderer_state(1, 2, 0, 0, -1, -1); // stopped, buffer_state=OK(2) - let mut payload = Vec::new(); - payload.extend(encode_field_bytes(1, session_uuid)); - payload.extend(encode_field_bytes(2, &device_info)); - payload.extend(encode_field_bytes(4, &initial_state)); - payload.extend(encode_field_varint(5, 1)); // is_active = true - build_qconnect_message(21, &payload) +fn build_renderer_join_session( + session_uuid: Vec, + config: &Config, + playing_state: i32, +) -> QConnectMessage { + QConnectMessage { + message_type: Some(QConnectMessageType::MessageTypeRndrSrvrJoinSession as i32), + rndr_srvr_join_session: Some(JoinSessionMessage { + session_uuid: Some(session_uuid), + device_info: Some(make_device_info(config)), + reason: None, + initial_state: Some(RendererStateMessage { + playing_state: Some(playing_state), + buffer_state: Some(if playing_state == 2 || playing_state == 3 { + 2 + } else { + 1 + }), + current_position: None, + duration: None, + queue_version: Some(QueueVersionRef { + major: Some(0), + minor: Some(0), + }), + current_queue_item_id: None, + next_queue_item_id: None, + }), + is_active: Some(true), + }), + ..Default::default() + } } -/// Build a RendererState protobuf. -/// buffer_state: 1=BUFFERING, 2=OK (per common.proto BufferState enum) -/// Encode a signed int32 as a protobuf varint field (sign-extended to 64 bits, matching proto int32 encoding). -fn encode_field_int32(field: u32, val: i32) -> Vec { - let tag = (field as u64) << 3; - let mut out = encode_varint(tag); - // Protobuf int32 sign-extends to 64 bits: -1 becomes 0xFFFFFFFFFFFFFFFF (10-byte varint) - out.extend(encode_varint(val as i64 as u64)); - out -} - -fn build_renderer_state( - playing_state: u64, - buffer_state: u64, +fn build_state_updated( + playing_state: i32, position_ms: u64, duration_ms: u64, queue_item_id: i32, next_queue_item_id: i32, -) -> Vec { - let mut out = Vec::new(); - out.extend(encode_field_varint(1, playing_state)); // field 1: playing_state - out.extend(encode_field_varint(2, buffer_state)); // field 2: buffer_state - // field 3: current_position (PlaybackPosition: field 1=timestamp fixed64, field 2=value ms) - let mut pos = Vec::new(); - pos.extend(encode_field_fixed64(1, now_millis())); // timestamp - if playing_state != 1 || buffer_state != 1 { - // Real app omits position_ms when STOPPED+BUFFERING(1) - pos.extend(encode_field_varint(2, position_ms)); // value (ms) +) -> QConnectMessage { + let current_qid = if queue_item_id <= 0 { + None + } else { + Some(queue_item_id) + }; + let next_qid = if next_queue_item_id <= 0 { + None + } else { + Some(next_queue_item_id) + }; + + QConnectMessage { + message_type: Some(QConnectMessageType::MessageTypeRndrSrvrStateUpdated as i32), + rndr_srvr_state_updated: Some(RendererStateUpdatedMessage { + state: Some(RendererStateMessage { + playing_state: Some(playing_state), + buffer_state: Some(if playing_state == 2 || playing_state == 3 { + 2 + } else { + 1 + }), + current_position: Some(PlaybackPositionMessage { + timestamp: Some(now_ms()), + value: Some(position_ms as i32), + }), + duration: if duration_ms > 0 { + Some(duration_ms as i32) + } else { + None + }, + queue_version: Some(QueueVersionRef { + major: Some(0), + minor: Some(0), + }), + current_queue_item_id: current_qid, + next_queue_item_id: next_qid, + }), + }), + ..Default::default() } - out.extend(encode_field_bytes(3, &pos)); - if duration_ms > 0 { - out.extend(encode_field_varint(4, duration_ms)); // field 4: duration (ms) - } - // field 5: queue_version (QueueVersion: field 1=major, field 2=minor) - // mpv reference client sends QueueVersion(major=0, minor=0) — proto3 default encodes as empty submessage - out.extend(encode_field_bytes(5, &[])); - // field 6: current_queue_item_id — real app sends -1 when no track (never omits) - out.extend(encode_field_int32(6, queue_item_id)); - // field 7: next_queue_item_id — real app sends -1 when no next track - out.extend(encode_field_int32(7, next_queue_item_id)); - out } -/// RNDR_SRVR_STATE_UPDATED (23): renderer reports its state. -fn msg_state_updated( - playing_state: u64, - buffer_state: u64, - position_ms: u64, - duration_ms: u64, - queue_item_id: i32, - next_queue_item_id: i32, -) -> Vec { - let state = build_renderer_state( - playing_state, - buffer_state, - position_ms, - duration_ms, - queue_item_id, - next_queue_item_id, - ); - let payload = encode_field_bytes(1, &state); - build_qconnect_message(23, &payload) +fn build_volume_changed(volume: i32) -> QConnectMessage { + QConnectMessage { + message_type: Some(QConnectMessageType::MessageTypeRndrSrvrVolumeChanged as i32), + rndr_srvr_volume_changed: Some(RendererVolumeChangedMessage { + volume: Some(volume), + }), + ..Default::default() + } } -/// Convert QConnect AudioQuality proto value to Qobuz API format_id. -/// Proto: 1=MP3, 2=CD, 3=HiRes96, 4=HiRes192, 5=HiRes192(max), 0/other=HiRes192 default -fn quality_to_format_id(quality: u32) -> u32 { +fn build_volume_muted(value: bool) -> QConnectMessage { + QConnectMessage { + message_type: Some(QConnectMessageType::MessageTypeRndrSrvrVolumeMuted as i32), + rndr_srvr_volume_muted: Some(RendererVolumeMutedMessage { value: Some(value) }), + ..Default::default() + } +} + +fn build_file_quality_changed(sr: i32, bits: i32, ch: i32, quality: i32) -> QConnectMessage { + QConnectMessage { + message_type: Some( + QConnectMessageType::MessageTypeRndrSrvrFileAudioQualityChanged as i32, + ), + rndr_srvr_file_audio_quality_changed: Some(RendererFileAudioQualityChangedMessage { + sampling_rate: Some(sr), + bit_depth: Some(bits), + nb_channels: Some(ch), + audio_quality: Some(quality), + }), + ..Default::default() + } +} + +fn build_renderer_action_next() -> QConnectMessage { + QConnectMessage { + message_type: Some(QConnectMessageType::MessageTypeRndrSrvrRendererAction as i32), + rndr_srvr_renderer_action: Some(RendererActionMessage { + seek_position: None, + action: 2, // ACTION_TYPE_NEXT + }), + ..Default::default() + } +} + +fn build_max_quality_changed(max_quality: i32) -> QConnectMessage { + QConnectMessage { + message_type: Some( + QConnectMessageType::MessageTypeRndrSrvrMaxAudioQualityChanged as i32, + ), + rndr_srvr_max_audio_quality_changed: Some(RendererMaxAudioQualityChangedMessage { + max_audio_quality: Some(max_quality), + network_type: None, + }), + ..Default::default() + } +} + +// --------------------------------------------------------------------------- +// Quality helpers +// --------------------------------------------------------------------------- + +fn quality_to_format_id(quality: i32) -> u32 { match quality { 1 => 5, // MP3 320kbps 2 => 6, // FLAC CD 16-bit/44.1kHz @@ -366,1091 +341,955 @@ fn quality_to_format_id(quality: u32) -> u32 { } } -/// RNDR_SRVR_RENDERER_ACTION (24): renderer reports a local user action. -/// ActionType: 0=UNKNOWN, 1=PREVIOUS, 2=NEXT, 3=REPEAT_OFF, 4=REPEAT_ONE, 5=REPEAT_ALL, -/// 6=SHUFFLE_OFF, 7=SHUFFLE_ON, 8=SEEK -fn msg_renderer_action(action: u64, seek_position: Option) -> Vec { - let mut payload = Vec::new(); - if let Some(pos) = seek_position { - payload.extend(encode_field_varint(1, pos as u64)); // field 1: seek_position - } - payload.extend(encode_field_varint(2, action)); // field 2: action - build_qconnect_message(24, &payload) -} - -/// RNDR_SRVR_VOLUME_CHANGED (25): renderer reports volume. -fn msg_volume_changed(volume: u64) -> Vec { - let payload = encode_field_varint(1, volume); - build_qconnect_message(25, &payload) -} - -/// RNDR_SRVR_MAX_AUDIO_QUALITY_CHANGED (28): renderer confirms quality setting. -/// networkType (field 2) is optional; when absent, quality applies generically. -fn msg_max_audio_quality_changed(quality: u64, network_type: Option) -> Vec { - let mut payload = encode_field_varint(1, quality); - if let Some(network_type) = network_type { - payload.extend(encode_field_varint(2, network_type)); - } - build_qconnect_message(28, &payload) -} - -fn msg_file_audio_quality_changed( - sampling_rate_hz: u64, - bit_depth: u64, - channels: u64, - audio_quality: u64, -) -> Vec { - let mut payload = encode_field_varint(1, sampling_rate_hz); - payload.extend(encode_field_varint(2, bit_depth)); - payload.extend(encode_field_varint(3, channels)); - payload.extend(encode_field_varint(4, audio_quality)); - build_qconnect_message(26, &payload) -} - -fn msg_device_audio_quality_changed( - sampling_rate_hz: u64, - bit_depth: u64, - channels: u64, -) -> Vec { - let mut payload = encode_field_varint(1, sampling_rate_hz); - payload.extend(encode_field_varint(2, bit_depth)); - payload.extend(encode_field_varint(3, channels)); - build_qconnect_message(27, &payload) -} - -fn quality_fallback_audio_params(quality: u32) -> (u32, u32, u32) { +fn quality_fallback_audio_params(quality: i32) -> (u32, u32, u32) { match quality { - 1 => (44100, 16, 2), // MP3 - 2 => (44100, 16, 2), // CD - 3 => (96000, 24, 2), // Hi-Res up to 96kHz - 4 | 5 => (192000, 24, 2), // Hi-Res up to 192/384kHz (use 192kHz fallback) + 1 => (44100, 16, 2), // MP3 + 2 => (44100, 16, 2), // CD + 3 => (96000, 24, 2), // Hi-Res up to 96kHz + 4 | 5 => (192000, 24, 2), // Hi-Res up to 192/384kHz _ => (44100, 16, 2), } } -/// RNDR_SRVR_VOLUME_MUTED (29): renderer confirms mute state. -fn msg_volume_muted(muted: bool) -> Vec { - let payload = encode_field_varint(1, if muted { 1 } else { 0 }); - build_qconnect_message(29, &payload) +// --------------------------------------------------------------------------- +// Local renderer state +// --------------------------------------------------------------------------- + +struct RendererLocal { + current_track_id: i32, + current_queue_item_id: i32, + current_next_queue_item_id: i32, + current_duration_ms: u64, + current_playing_state: i32, // 1=stopped, 2=playing, 3=paused + max_audio_quality: i32, // default: 4 + volume: u8, // default: 100 + pre_mute_volume: u8, // default: 100 + muted: bool, + track_ended: bool, + has_position_progress: bool, + last_play_at: Instant, + ignore_seek_until: Option, + joined: bool, } -// --------------------------------------------------------------------------- -// QConnect message parser — extracts messages from the frame layer -// --------------------------------------------------------------------------- - -/// Extracts the QConnect Message from a data frame's body. -/// Frame body field 7 = qconnect container, which has field 3 = Message. -/// Returns (message_type, payload_for_that_type) pairs. -fn extract_qconnect_messages(frame_body: &[u8]) -> Vec<(u32, Vec)> { - let mut result = Vec::new(); - let fields = parse_fields(frame_body); - - // Field 7 is the qconnect container - for (fnum, wt, data) in &fields { - if *fnum == 7 && *wt == 2 { - // Inside field 7, field 3 is the serialized QConnect Message - let container_fields = parse_fields(data); - for (cfnum, cwt, cdata) in &container_fields { - if *cfnum == 3 && *cwt == 2 { - // This is the QConnect Message - let msg_fields = parse_fields(cdata); - let msg_type = get_varint_field(&msg_fields, 1).unwrap_or(0) as u32; - // The payload is in the field whose number matches message_type - if let Some(payload) = get_bytes_field(&msg_fields, msg_type) { - result.push((msg_type, payload.to_vec())); - } else { - // Some messages have no sub-payload (just the type) - result.push((msg_type, Vec::new())); - } - } - } +impl Default for RendererLocal { + fn default() -> Self { + Self { + current_track_id: 0, + current_queue_item_id: -1, + current_next_queue_item_id: -1, + current_duration_ms: 0, + current_playing_state: 1, + max_audio_quality: 4, + volume: 100, + pre_mute_volume: 100, + muted: false, + track_ended: false, + has_position_progress: false, + last_play_at: Instant::now(), + ignore_seek_until: None, + joined: false, } } - result } // --------------------------------------------------------------------------- -// Parsed incoming commands +// RendererHandler // --------------------------------------------------------------------------- -#[derive(Debug, Clone)] -pub enum QConnectCommand { - SetState { - playing_state: Option, // None = not set (keep current), Some(1)=stopped, Some(2)=playing, Some(3)=paused - position_ms: Option, // None = field not present - current_track: Option, - next_track: Option, - queue_version_major: u32, - }, - SetVolume { - volume: Option, - delta: Option, - }, - SetActive { - active: bool, - }, - SetLoopMode(u32), - SetShuffleMode(u32), - MuteVolume(bool), - SetMaxAudioQuality(u32), - Unknown(u32), +#[derive(Clone)] +struct RendererHandler { + api: Arc, + auth_token: Arc, + player: Arc, + local: Arc>, + out_tx: mpsc::Sender, + config: Arc, } -#[derive(Debug, Clone)] -pub struct TrackRef { - pub queue_item_id: i32, - pub track_id: i32, -} - -fn parse_queue_track(data: &[u8]) -> TrackRef { - let fields = parse_fields(data); - let queue_item_id = get_varint_field(&fields, 1).unwrap_or(0) as i32; - // track_id is fixed32 on the wire (not varint) - let track_id = get_fixed32_field(&fields, 2).unwrap_or(0) as i32; - TrackRef { - queue_item_id, - track_id, +impl RendererHandler { + async fn send(&self, msg: QConnectMessage) { + if let Err(e) = self.out_tx.send(msg).await { + warn!("[QConnect] Failed to queue outbound message: {e}"); + } } -} -fn parse_incoming_commands(data: &[u8]) -> Vec { - let mut cmds = Vec::new(); - - for (frame_type, frame_body) in decode_all_frames(data) { - if frame_type != 6 { - debug!( - "[FRAME] type={} body={} bytes", - frame_type, - frame_body.len() - ); - continue; // Only process data payload frames + async fn join_session(&self, session_uuid: Vec) { + let (playing_state, already_joined) = { + let local = self.local.lock().await; + (local.current_playing_state, local.joined) + }; + if already_joined { + return; } - for (msg_type, payload) in extract_qconnect_messages(&frame_body) { - let cmd = match msg_type { - // SRVR_RNDR_SET_STATE (41) - 41 => { - let fields = parse_fields(&payload); - let playing_state = get_varint_field(&fields, 1).map(|v| v as u32); // None = not present - let position_ms = get_varint_field(&fields, 2).map(|v| v as u32); - let queue_version_major = get_bytes_field(&fields, 3) - .map(|qv| { - let qvf = parse_fields(qv); - get_varint_field(&qvf, 1).unwrap_or(0) as u32 - }) - .unwrap_or(0); - let current_track = get_bytes_field(&fields, 4) - .map(parse_queue_track) - .and_then(|t| { - if t.track_id <= 0 || t.queue_item_id < 0 { - None - } else { - Some(t) - } - }); - let next_track = - get_bytes_field(&fields, 5) - .map(parse_queue_track) - .and_then(|t| { - if t.track_id <= 0 || t.queue_item_id < 0 { - None - } else { - Some(t) - } - }); + info!("[QConnect] Sending renderer join"); + let msg = build_renderer_join_session(session_uuid, &self.config, playing_state); + self.send(msg).await; - info!("[RECV] SET_STATE: playing_state={:?}, position_ms={:?}, current_track={:?}, next_track={:?}, queue_ver={}", - playing_state, position_ms, current_track, next_track, queue_version_major); + { + let mut local = self.local.lock().await; + local.joined = true; + } - QConnectCommand::SetState { - playing_state, - position_ms, - current_track, - next_track, - queue_version_major, - } - } - // SRVR_RNDR_SET_VOLUME (42) - 42 => { - let fields = parse_fields(&payload); - let volume = get_varint_field(&fields, 1).map(|v| v as u32); - let delta = get_varint_field(&fields, 2).map(|v| v as i32); - QConnectCommand::SetVolume { volume, delta } - } - // SRVR_RNDR_SET_ACTIVE (43) - 43 => { - let fields = parse_fields(&payload); - let active = get_varint_field(&fields, 1).unwrap_or(0) != 0; - QConnectCommand::SetActive { active } - } - // SRVR_RNDR_SET_LOOP_MODE (45) - 45 => { - let fields = parse_fields(&payload); - let mode = get_varint_field(&fields, 1).unwrap_or(0) as u32; - QConnectCommand::SetLoopMode(mode) - } - // SRVR_RNDR_SET_SHUFFLE_MODE (46) - 46 => { - let fields = parse_fields(&payload); - let mode = get_varint_field(&fields, 1).unwrap_or(0) as u32; - QConnectCommand::SetShuffleMode(mode) - } - // SRVR_RNDR_MUTE_VOLUME (47) - 47 => { - let fields = parse_fields(&payload); - let muted = get_varint_field(&fields, 1).unwrap_or(0) != 0; - QConnectCommand::MuteVolume(muted) - } - // SRVR_RNDR_SET_MAX_AUDIO_QUALITY (44) - 44 => { - let fields = parse_fields(&payload); - let quality = get_varint_field(&fields, 1).unwrap_or(0) as u32; - QConnectCommand::SetMaxAudioQuality(quality) - } - other => { - info!( - "[RECV] Unknown msg type {}: payload {} bytes = {:02x?}", - other, - payload.len(), - &payload[..payload.len().min(64)] - ); - QConnectCommand::Unknown(other) - } + // Send initial volume report + let vol = self.local.lock().await.volume; + self.send(build_volume_changed(vol as i32)).await; + info!("[QConnect] Renderer joined, initial volume={vol} sent"); + } + + async fn send_state_report( + &self, + playing_state: i32, + position_ms: u64, + duration_ms: u64, + queue_item_id: i32, + next_queue_item_id: i32, + ) { + debug!( + "[QConnect] StateReport: playing={playing_state} pos={position_ms}ms dur={duration_ms}ms qi={queue_item_id} nqi={next_queue_item_id}" + ); + self.send(build_state_updated( + playing_state, + position_ms, + duration_ms, + queue_item_id, + next_queue_item_id, + )) + .await; + } + + async fn dispatch_inbound(&self, batch: QConnectMessages) { + for msg in batch.messages { + let Some(mt) = msg.message_type.or_else(|| resolve_message_type_from_fields(&msg)) + else { + continue; }; - debug!("QConnect command: {:?}", cmd); - cmds.push(cmd); - } - } - cmds -} -// --------------------------------------------------------------------------- -// QConnect public API -// --------------------------------------------------------------------------- - -pub struct QConnect { - cmd_rx: mpsc::Receiver, -} - -impl QConnect { - pub fn start(auth_token: String, device_uuid: String, device_name: String) -> Self { - let (cmd_tx, cmd_rx) = mpsc::channel::(64); - - tokio::spawn(async move { - qconnect_task(auth_token, device_uuid, device_name, cmd_tx).await; - }); - - Self { cmd_rx } - } - - pub fn poll_command(&mut self) -> Option { - self.cmd_rx.try_recv().ok() - } -} - -// --------------------------------------------------------------------------- -// Connection logic -// --------------------------------------------------------------------------- - -async fn qconnect_task( - auth_token: String, - device_uuid: String, - device_name: String, - cmd_tx: mpsc::Sender, -) { - let mut backoff = 5u64; - loop { - info!("QConnect: connecting..."); - match run_connection(&auth_token, &device_uuid, &device_name, &cmd_tx).await { - Ok(()) => { - info!("QConnect: disconnected cleanly"); - backoff = 5; + if mt == QConnectMessageType::MessageTypeSrvrCtrlSessionState as i32 { + if let Some(ss) = msg.srvr_ctrl_session_state { + if let Some(uuid_bytes) = ss.session_uuid { + let should_join = !self.local.lock().await.joined; + if should_join { + info!("[QConnect] Got session UUID from SessionState"); + let h = self.clone(); + tokio::spawn(async move { + h.join_session(uuid_bytes).await; + }); + } + } + } + continue; } - Err(e) => { - error!("QConnect: error: {}", e); - } - } - info!("QConnect: reconnecting in {}s", backoff); - tokio::time::sleep(std::time::Duration::from_secs(backoff)).await; - backoff = (backoff * 2).min(120); - } -} -async fn get_session_uuid( - api: &QobuzApi, - auth_token: &str, - device_uuid: &str, - device_name: &str, -) -> Result> { - let token_resp = api.get_qws_token(auth_token).await?; - let jwt = token_resp.jwt_qws.jwt; - let endpoint = &token_resp.jwt_qws.endpoint; - - info!("QConnect ctrl: connecting to {}", endpoint); - let (ws, _) = tokio_tungstenite::connect_async(endpoint).await?; - let (mut tx, mut rx) = ws.split(); - - // Auth - tx.send(Message::Binary(build_auth_frame(1, &jwt).into())) - .await?; - if let Some(r) = rx.next().await { - r?; - } - - // Subscribe - tx.send(Message::Binary(build_subscribe_frame(2).into())) - .await?; - if let Some(r) = rx.next().await { - r?; - } - - // Send ctrl_join_session - let ctrl_join = build_payload_frame(3, &msg_ctrl_join_session(device_uuid, device_name)); - tx.send(Message::Binary(ctrl_join.into())).await?; - - // Wait for session UUID in response - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); - loop { - let remaining = deadline - .checked_duration_since(std::time::Instant::now()) - .ok_or_else(|| anyhow::anyhow!("timeout waiting for session UUID"))?; - - match tokio::time::timeout(remaining, rx.next()).await { - Ok(Some(Ok(Message::Binary(data)))) => { - for (frame_type, frame_body) in decode_all_frames(&data) { - if frame_type != 6 { + if mt == QConnectMessageType::MessageTypeSrvrRndrSetState as i32 { + if let Some(payload) = msg.srvr_rndr_set_state { + // Echo detection: server echoes our state reports as SET_STATE with only + // next_track set. Skip those — but not seek commands (current_position set). + if payload.playing_state.is_none() + && payload.current_track.is_none() + && payload.current_position.is_none() + { + debug!("[QConnect] Skipping echo SET_STATE"); continue; } - // Look for session UUID in the SRVR_CTRL_SESSION_STATE (81) response - for (msg_type, payload) in extract_qconnect_messages(&frame_body) { - if msg_type == 81 { - // Session state — look for session UUID - let fields = parse_fields(&payload); - // Field 7 in session state might have device info - // Field 1 might be session UUID - if let Some(uuid_bytes) = get_bytes_field(&fields, 1) { - if uuid_bytes.len() == 16 { - info!("Got session UUID from msg type 81"); - return Ok(uuid_bytes.to_vec()); - } - } + let h = self.clone(); + tokio::spawn(async move { + h.handle_set_state(payload).await; + }); + } + continue; + } + + if mt == QConnectMessageType::MessageTypeSrvrRndrSetVolume as i32 { + if let Some(payload) = msg.srvr_rndr_set_volume { + let h = self.clone(); + tokio::spawn(async move { + h.handle_set_volume(payload).await; + }); + } + continue; + } + + if mt == QConnectMessageType::MessageTypeSrvrRndrSetActive as i32 { + if let Some(payload) = msg.srvr_rndr_set_active { + let h = self.clone(); + tokio::spawn(async move { + h.handle_set_active(payload).await; + }); + } + continue; + } + + if mt == QConnectMessageType::MessageTypeSrvrRndrSetMaxAudioQuality as i32 { + if let Some(payload) = msg.srvr_rndr_set_max_audio_quality { + let h = self.clone(); + tokio::spawn(async move { + h.handle_set_max_audio_quality(payload).await; + }); + } + continue; + } + + if mt == QConnectMessageType::MessageTypeSrvrRndrSetLoopMode as i32 { + if let Some(payload) = msg.srvr_rndr_set_loop_mode { + info!("[QConnect] SetLoopMode: {:?}", payload.loop_mode); + } + continue; + } + + if mt == QConnectMessageType::MessageTypeSrvrRndrSetShuffleMode as i32 { + if let Some(payload) = msg.srvr_rndr_set_shuffle_mode { + info!("[QConnect] SetShuffleMode: {:?}", payload.shuffle_mode); + } + continue; + } + + if mt == QConnectMessageType::MessageTypeSrvrRndrMuteVolume as i32 { + if let Some(payload) = msg.srvr_rndr_mute_volume { + let h = self.clone(); + tokio::spawn(async move { + h.handle_mute_volume(payload).await; + }); + } + continue; + } + } + } + + async fn handle_set_state(&self, payload: RendererSetStateMessage) { + let playing_state = payload.playing_state; + let current_position_ms = payload.current_position.map(|p| p.max(0) as u64); + let current_track = payload.current_track; + let next_track = payload.next_track; + + info!( + "[QConnect] SET_STATE: playing_state={:?} pos={:?} current_track={:?} next_track={:?}", + playing_state, + current_position_ms, + current_track + .as_ref() + .map(|t| (t.track_id, t.queue_item_id)), + next_track.as_ref().map(|t| (t.track_id, t.queue_item_id)), + ); + + // Store next_track metadata + if let Some(nt) = &next_track { + let mut local = self.local.lock().await; + local.current_next_queue_item_id = nt.queue_item_id.unwrap_or(-1); + } + + // Seek-only detection (no playing_state, no current/next track, position present) + let seek_only = playing_state.is_none() + && current_track.is_none() + && next_track.is_none() + && current_position_ms.is_some(); + + if seek_only { + let target_pos = current_position_ms.unwrap_or(0); + let player_pos = self.player.status().position_ms; + + let (qi, nqi, ps, dur) = { + let mut local = self.local.lock().await; + let suppress = target_pos > 0 + && local + .ignore_seek_until + .map(|deadline| Instant::now() < deadline) + .unwrap_or(false); + let should_seek = + !suppress && (target_pos == 0 || target_pos.abs_diff(player_pos) > 350); + if should_seek { + self.player.send(PlayerCommand::Seek(target_pos)); + local.track_ended = false; + if target_pos == 0 { + local.ignore_seek_until = + Some(Instant::now() + Duration::from_secs(2)); + } + } + info!( + "[QConnect] seek-only: target={target_pos}ms local={player_pos}ms should_seek={should_seek} suppressed={suppress}" + ); + ( + local.current_queue_item_id, + local.current_next_queue_item_id, + local.current_playing_state, + local.current_duration_ms, + ) + }; + self.send_state_report(ps, target_pos, dur, qi, nqi).await; + return; + } + + // New track detection + // Sentinel value: track_id=u32::MAX means "no track" (queue cleared by another client) + let queue_cleared = current_track + .as_ref() + .map_or(false, |t| t.track_id.unwrap_or(0) == u32::MAX); + + if queue_cleared { + debug!("[QConnect] Queue cleared (sentinel track), stopping playback"); + self.player.send(PlayerCommand::Stop); + let mut local = self.local.lock().await; + local.current_track_id = -1; + local.current_queue_item_id = -1; + local.current_next_queue_item_id = -1; + local.current_playing_state = 1; + local.current_duration_ms = 0; + local.track_ended = false; + return; + } + + let new_track_info: Option<(i32, i32, i32, u64, i32)> = { + let local = self.local.lock().await; + if let Some(track) = ¤t_track { + let tid = track.track_id.unwrap_or(0) as i32; + let qid = track.queue_item_id.unwrap_or(-1); + if tid != local.current_track_id || qid != local.current_queue_item_id { + Some(( + tid, + qid, + local.max_audio_quality, + current_position_ms.unwrap_or(0), + local.current_next_queue_item_id, + )) + } else { + None + } + } else { + None + } + }; + + if let Some((tid, qid, max_q, start_pos, nqi_snap)) = new_track_info { + info!("[QConnect] Loading new track {tid} (qi={qid})"); + { + let mut local = self.local.lock().await; + local.current_track_id = tid; + local.current_queue_item_id = qid; + local.current_playing_state = 2; + local.current_duration_ms = 0; + local.last_play_at = Instant::now(); + local.has_position_progress = false; + local.track_ended = false; + local.ignore_seek_until = None; + } + + // Send BUFFERING state immediately + self.send_state_report(2, start_pos, 0, qid, nqi_snap).await; + + let track_id_str = tid.to_string(); + let format_id = quality_to_format_id(max_q); + + let duration_ms = match self.api.get_track(&self.auth_token, &track_id_str).await { + Ok(t) => t.duration.unwrap_or(0) as u64 * 1000, + Err(e) => { + warn!("[QConnect] get_track failed: {e}"); + 0 + } + }; + + match self + .api + .get_track_stream(&self.auth_token, &track_id_str, format_id) + .await + { + Ok(stream) => { + let (player_stream, stream_sr, stream_bits, stream_ch) = match stream { + TrackStream::DirectUrl { + url, + sampling_rate_hz, + bit_depth, + channels, + } => { + info!( + "[QConnect] Got direct stream URL (duration={duration_ms}ms)" + ); + ( + StreamSource::DirectUrl(url), + sampling_rate_hz, + bit_depth, + channels, + ) } + TrackStream::Segmented { + url_template, + n_segments, + encryption_key_hex, + sampling_rate_hz, + bit_depth, + channels, + } => { + info!("[QConnect] Got segmented stream (segments={n_segments}, duration={duration_ms}ms)"); + ( + StreamSource::Segmented { + url_template, + n_segments, + encryption_key_hex, + }, + sampling_rate_hz, + bit_depth, + channels, + ) + } + }; + + self.player.send(PlayerCommand::Play { + stream: player_stream, + track_id: tid, + queue_item_id: qid, + duration_ms, + start_position_ms: start_pos, + }); + + { + let mut loc = self.local.lock().await; + loc.current_duration_ms = duration_ms; } - // Fallback: scan frame field 7 deeply for any 16-byte UUID - let frame_fields = parse_fields(&frame_body); - if let Some(f7) = get_bytes_field(&frame_fields, 7) { - if let Some(uuid) = find_session_uuid(f7) { - info!("Got session UUID from deep scan"); - return Ok(uuid); + let (fallback_sr, fallback_bits, fallback_ch) = + quality_fallback_audio_params(max_q); + let sr = stream_sr.unwrap_or(fallback_sr).max(1); + let bits = stream_bits.unwrap_or(fallback_bits).max(1); + let ch = stream_ch.unwrap_or(fallback_ch).max(1); + + self.send(build_file_quality_changed( + sr as i32, + bits as i32, + ch as i32, + max_q, + )) + .await; + self.send(build_max_quality_changed(max_q)).await; + self.send_state_report(2, start_pos, duration_ms, qid, nqi_snap) + .await; + } + Err(e) => { + error!("[QConnect] Failed to get stream URL: {e}"); + { + let mut loc = self.local.lock().await; + loc.current_playing_state = 1; + } + self.send_state_report(1, 0, 0, qid, nqi_snap).await; + } + } + return; + } + + // playing_state only (no new track) + if let Some(ps) = playing_state { + let start_pos = current_position_ms.unwrap_or(0); + let (cur_ps, cur_pos, dur, qi, nqi) = { + let mut local = self.local.lock().await; + match ps { + 2 => { + let status = self.player.status(); + let should_restart = current_track.is_some() + && (local.track_ended || status.state == PlayerState::Stopped); + + if should_restart { + info!( + "[QConnect] Restarting current track from {start_pos}ms (ended={} state={:?})", + local.track_ended, status.state + ); + self.player.send(PlayerCommand::Seek(start_pos)); + local.current_playing_state = 2; + local.track_ended = false; + } else if local.current_playing_state == 3 { + info!("[QConnect] Resuming playback"); + self.player.send(PlayerCommand::Resume); + local.current_playing_state = 2; + local.track_ended = false; + } else if local.current_playing_state != 2 { + info!( + "[QConnect] Play requested, state={}", + local.current_playing_state + ); + local.current_playing_state = 2; } } + 3 => { + info!("[QConnect] Pausing playback"); + self.player.send(PlayerCommand::Pause); + local.current_playing_state = 3; + } + 1 => { + info!("[QConnect] Stopping playback"); + self.player.send(PlayerCommand::Stop); + local.current_playing_state = 1; + local.current_queue_item_id = -1; + local.current_next_queue_item_id = -1; + local.current_track_id = 0; + local.track_ended = false; + } + _ => {} + } + + let pos_out = match ps { + 1 => 0, + _ => current_position_ms.unwrap_or_else(|| self.player.status().position_ms), + }; + ( + local.current_playing_state, + pos_out, + local.current_duration_ms, + local.current_queue_item_id, + local.current_next_queue_item_id, + ) + }; + + self.send_state_report(cur_ps, cur_pos, dur, qi, nqi).await; + return; + } + + // Position control with no playing_state and no track change + let (ps, pos, dur, qi, nqi) = { + let mut local = self.local.lock().await; + if let Some(p) = current_position_ms { + let player_pos = self.player.status().position_ms; + let suppress = p > 0 + && local + .ignore_seek_until + .map(|deadline| Instant::now() < deadline) + .unwrap_or(false); + let should_seek = !suppress && (p == 0 || p.abs_diff(player_pos) > 350); + if should_seek { + info!("[QConnect] Position jump: seeking to {p}ms (local={player_pos}ms)"); + self.player.send(PlayerCommand::Seek(p)); + local.track_ended = false; + if p == 0 { + local.ignore_seek_until = + Some(Instant::now() + Duration::from_secs(2)); + } } } - Ok(Some(Ok(Message::Ping(data)))) => { - tx.send(Message::Pong(data)).await?; + let pos_out = + current_position_ms.unwrap_or_else(|| self.player.status().position_ms); + ( + local.current_playing_state, + pos_out, + local.current_duration_ms, + local.current_queue_item_id, + local.current_next_queue_item_id, + ) + }; + + self.send_state_report(ps, pos, dur, qi, nqi).await; + } + + async fn handle_set_volume(&self, payload: RendererSetVolumeMessage) { + let mut local = self.local.lock().await; + let new_vol = if let Some(v) = payload.volume { + v.clamp(0, 100) as u8 + } else if let Some(d) = payload.volume_delta { + (local.volume as i32 + d).clamp(0, 100) as u8 + } else { + local.volume + }; + info!("[QConnect] SetVolume: {new_vol}"); + local.volume = new_vol; + if local.muted && new_vol > 0 { + local.muted = false; + } + drop(local); + self.player.send(PlayerCommand::SetVolume(new_vol)); + self.send(build_volume_changed(new_vol as i32)).await; + } + + async fn handle_set_active(&self, payload: RendererSetActiveMessage) { + let active = payload.active.unwrap_or(true); + info!("[QConnect] SetActive: {active}"); + if !active { + let mut local = self.local.lock().await; + local.current_playing_state = 1; + local.current_queue_item_id = -1; + local.current_next_queue_item_id = -1; + local.current_track_id = 0; + let nqi = local.current_next_queue_item_id; + drop(local); + self.player.send(PlayerCommand::Stop); + self.send_state_report(1, 0, 0, -1, nqi).await; + } + } + + async fn handle_mute_volume(&self, payload: RendererMuteVolumeMessage) { + let value = payload.value.unwrap_or(false); + info!("[QConnect] MuteVolume: {value}"); + let mut local = self.local.lock().await; + if value { + local.pre_mute_volume = local.volume; + local.volume = 0; + local.muted = true; + } else { + local.volume = local.pre_mute_volume; + local.muted = false; + } + let vol = local.volume; + drop(local); + self.player.send(PlayerCommand::SetVolume(vol)); + self.send(build_volume_muted(value)).await; + } + + async fn handle_set_max_audio_quality(&self, payload: RendererSetMaxAudioQualityMessage) { + let max_audio_quality = payload.max_audio_quality.unwrap_or(4); + let format_id = quality_to_format_id(max_audio_quality); + info!("[QConnect] SetMaxAudioQuality: {max_audio_quality} (format_id={format_id})"); + + { + let mut local = self.local.lock().await; + local.max_audio_quality = max_audio_quality; + } + + self.send(build_max_quality_changed(max_audio_quality)).await; + + let (tid, qi, nqi, cur_ps, dur) = { + let local = self.local.lock().await; + ( + local.current_track_id, + local.current_queue_item_id, + local.current_next_queue_item_id, + local.current_playing_state, + local.current_duration_ms, + ) + }; + + if cur_ps == 2 && tid != 0 { + let restart_pos = self.player.status().position_ms; + info!("[QConnect] Restarting track {tid} at new quality format_id={format_id}"); + + self.send_state_report(2, restart_pos, dur, qi, nqi).await; + + let track_id_str = tid.to_string(); + let duration_ms = match self.api.get_track(&self.auth_token, &track_id_str).await { + Ok(t) => t.duration.unwrap_or(0) as u64 * 1000, + Err(e) => { + warn!("[QConnect] get_track failed: {e}"); + dur + } + }; + + match self + .api + .get_track_stream(&self.auth_token, &track_id_str, format_id) + .await + { + Ok(stream) => { + let (player_stream, stream_sr, stream_bits, stream_ch) = match stream { + TrackStream::DirectUrl { + url, + sampling_rate_hz, + bit_depth, + channels, + } => (StreamSource::DirectUrl(url), sampling_rate_hz, bit_depth, channels), + TrackStream::Segmented { + url_template, + n_segments, + encryption_key_hex, + sampling_rate_hz, + bit_depth, + channels, + } => ( + StreamSource::Segmented { + url_template, + n_segments, + encryption_key_hex, + }, + sampling_rate_hz, + bit_depth, + channels, + ), + }; + + self.player.send(PlayerCommand::Play { + stream: player_stream, + track_id: tid, + queue_item_id: qi, + duration_ms, + start_position_ms: restart_pos, + }); + + { + let mut loc = self.local.lock().await; + loc.current_duration_ms = duration_ms; + } + + let (fallback_sr, fallback_bits, fallback_ch) = + quality_fallback_audio_params(max_audio_quality); + let sr = stream_sr.unwrap_or(fallback_sr).max(1); + let bits = stream_bits.unwrap_or(fallback_bits).max(1); + let ch = stream_ch.unwrap_or(fallback_ch).max(1); + + self.send(build_file_quality_changed( + sr as i32, + bits as i32, + ch as i32, + max_audio_quality, + )) + .await; + self.send(build_max_quality_changed(max_audio_quality)).await; + self.send_state_report(2, restart_pos, duration_ms, qi, nqi) + .await; + info!("[QConnect] Restarted at format_id={format_id}"); + } + Err(e) => { + error!("[QConnect] Failed to get stream for quality change: {e}"); + let mut loc = self.local.lock().await; + loc.current_playing_state = 1; + drop(loc); + self.send_state_report(1, 0, duration_ms, qi, nqi).await; + } } - Ok(Some(Ok(_))) => {} - Ok(Some(Err(e))) => bail!("ctrl connection error: {}", e), - _ => bail!("timeout waiting for session UUID"), } } } -/// Recursively search for a 16-byte blob that looks like a session UUID. -fn find_session_uuid(data: &[u8]) -> Option> { - let fields = parse_fields(data); - if let Some(candidate) = get_bytes_field(&fields, 1) { - // Prefer structures that look like SessionState payloads: - // field 1 = 16-byte UUID, field 2 = enum/int varint. - if candidate.len() == 16 && get_varint_field(&fields, 2).is_some() { - return Some(candidate.to_vec()); - } +// Fallback: resolve message type from which field is populated (mirrors qbz decoder) +fn resolve_message_type_from_fields(msg: &QConnectMessage) -> Option { + if msg.srvr_rndr_set_state.is_some() { + return Some(QConnectMessageType::MessageTypeSrvrRndrSetState as i32); } - - for (_, wt, field_data) in &fields { - if *wt == 2 { - if let Some(found) = find_session_uuid(field_data) { - return Some(found); - } - } + if msg.srvr_rndr_set_volume.is_some() { + return Some(QConnectMessageType::MessageTypeSrvrRndrSetVolume as i32); + } + if msg.srvr_rndr_set_active.is_some() { + return Some(QConnectMessageType::MessageTypeSrvrRndrSetActive as i32); + } + if msg.srvr_rndr_set_max_audio_quality.is_some() { + return Some(QConnectMessageType::MessageTypeSrvrRndrSetMaxAudioQuality as i32); + } + if msg.srvr_rndr_set_loop_mode.is_some() { + return Some(QConnectMessageType::MessageTypeSrvrRndrSetLoopMode as i32); + } + if msg.srvr_rndr_set_shuffle_mode.is_some() { + return Some(QConnectMessageType::MessageTypeSrvrRndrSetShuffleMode as i32); + } + if msg.srvr_rndr_mute_volume.is_some() { + return Some(QConnectMessageType::MessageTypeSrvrRndrMuteVolume as i32); + } + if msg.srvr_ctrl_session_state.is_some() { + return Some(QConnectMessageType::MessageTypeSrvrCtrlSessionState as i32); } None } -async fn run_connection( - auth_token: &str, - device_uuid: &str, - device_name: &str, - cmd_tx: &mpsc::Sender, -) -> Result<()> { - let config = Config::load().map_err(|e| anyhow::anyhow!("{}", e))?; - let api = QobuzApi::new(&config); +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- - // 1. Get session UUID via ctrl connection - info!("QConnect: getting session UUID..."); - let session_uuid = get_session_uuid(&api, auth_token, device_uuid, device_name).await?; - info!("QConnect: got session UUID ({} bytes)", session_uuid.len()); +pub struct QConnect { + _handle: tokio::task::JoinHandle<()>, +} - // 2. Open renderer connection +impl QConnect { + pub fn start(auth_token: String, _device_uuid: String, _device_name: String) -> Self { + let handle = tokio::spawn(async move { + let config = match Config::load() { + Ok(c) => c, + Err(e) => { + error!("[QConnect] Failed to load config: {e}"); + return; + } + }; + run_qconnect(auth_token, config).await; + }); + Self { _handle: handle } + } +} + +pub async fn run_qconnect(auth_token: String, config: Config) { + let mut backoff = 5u64; + loop { + info!("[QConnect] Connecting..."); + match run_qconnect_once(&auth_token, &config).await { + Ok(()) => { + info!("[QConnect] Disconnected cleanly"); + backoff = 5; + } + Err(e) => { + error!("[QConnect] Error: {e}"); + } + } + info!("[QConnect] Reconnecting in {backoff}s"); + tokio::time::sleep(Duration::from_secs(backoff)).await; + backoff = (backoff * 2).min(120); + } +} + +async fn run_qconnect_once(auth_token: &str, config: &Config) -> anyhow::Result<()> { + let api = Arc::new(QobuzApi::new(config)); let token_resp = api.get_qws_token(auth_token).await?; let jwt = token_resp.jwt_qws.jwt; - let endpoint = &token_resp.jwt_qws.endpoint; + let endpoint = token_resp.jwt_qws.endpoint; - info!("QConnect renderer: connecting to {}", endpoint); - let (ws, _) = tokio_tungstenite::connect_async(endpoint).await?; - let (mut ws_tx, mut ws_rx) = ws.split(); + info!("[QConnect] Connecting to {endpoint}"); - let mut msg_id: u64 = 1; + let (ws, _) = connect_async_with_config( + &endpoint, + Some(WebSocketConfig::default()), + false, + ) + .await + .map_err(|e| anyhow::anyhow!("WebSocket connect failed: {e}"))?; - // Auth - ws_tx - .send(Message::Binary(build_auth_frame(msg_id, &jwt).into())) - .await?; - msg_id += 1; - if let Some(r) = ws_rx.next().await { - r?; - } + let (mut ws_write, mut ws_read) = ws.split(); + let mut msg_id: u32 = 0; + + // Authenticate + let auth_frame = build_auth_frame(&jwt, &mut msg_id); + ws_write + .send(WsMessage::Binary(auth_frame.into())) + .await + .map_err(|e| anyhow::anyhow!("auth send failed: {e}"))?; // Subscribe - ws_tx - .send(Message::Binary(build_subscribe_frame(msg_id).into())) - .await?; - msg_id += 1; - if let Some(r) = ws_rx.next().await { - r?; - } + let sub_frame = build_subscribe_frame(&mut msg_id); + ws_write + .send(WsMessage::Binary(sub_frame.into())) + .await + .map_err(|e| anyhow::anyhow!("subscribe send failed: {e}"))?; - // Join session as renderer - let join_msg = msg_renderer_join_session(device_uuid, device_name, &session_uuid); - ws_tx - .send(Message::Binary( - build_payload_frame(msg_id, &join_msg).into(), - )) - .await?; - msg_id += 1; + let player = Arc::new(AudioPlayer::new()); + let local = Arc::new(Mutex::new(RendererLocal::default())); + let (out_tx, mut out_rx) = mpsc::channel::(64); - // Read join response - for _ in 0..5 { - match tokio::time::timeout(std::time::Duration::from_secs(5), ws_rx.next()).await { - Ok(Some(Ok(Message::Binary(data)))) => { - for (frame_type, frame_body) in decode_all_frames(&data) { - if frame_type != 6 { - continue; - } - for (mt, payload) in extract_qconnect_messages(&frame_body) { - if mt == 1 { - // Error - let fields = parse_fields(&payload); - let code = get_bytes_field(&fields, 1) - .and_then(|b| std::str::from_utf8(b).ok()) - .unwrap_or("?"); - let message = get_bytes_field(&fields, 2) - .and_then(|b| std::str::from_utf8(b).ok()) - .unwrap_or("?"); - bail!("renderer join rejected: {} — {}", code, message); - } - if mt == 43 { - info!("QConnect: renderer joined (SET_ACTIVE received)"); - } - } - } - break; - } - Ok(Some(Ok(Message::Ping(data)))) => { - ws_tx.send(Message::Pong(data)).await?; - } - Ok(Some(Ok(_))) => break, - Ok(Some(Err(e))) => bail!("WS error on join: {}", e), - _ => break, - } - } - info!("QConnect: joined session as renderer"); + let handler = RendererHandler { + api: Arc::clone(&api), + auth_token: Arc::new(auth_token.to_string()), + player: Arc::clone(&player), + local: Arc::clone(&local), + out_tx, + config: Arc::new(config.clone()), + }; - // Send initial state (stopped, buffer_state=OK) and volume + // Send ctrl join after subscribe { - let state_msg = msg_state_updated(1, 2, 0, 0, -1, -1); - ws_tx - .send(Message::Binary( - build_payload_frame(msg_id, &state_msg).into(), - )) - .await?; - msg_id += 1; - - let vol_msg = msg_volume_changed(100); - ws_tx - .send(Message::Binary( - build_payload_frame(msg_id, &vol_msg).into(), - )) - .await?; - msg_id += 1; + let ctrl_join = build_ctrl_join_session(config); + let frame = build_payload_frame(ctrl_join, &mut msg_id); + ws_write + .send(WsMessage::Binary(frame.into())) + .await + .map_err(|e| anyhow::anyhow!("ctrl join send failed: {e}"))?; + info!("[QConnect] Subscribed — sent CtrlSrvrJoinSession"); } - // Create audio player - let player = AudioPlayer::new(); - - // Local state tracking (optimistic — reflects what we've been told to do) - let mut current_playing_state: u64 = 1; // 1=stopped, 2=playing, 3=paused - let mut current_queue_item_id: i32 = -1; - let mut current_next_queue_item_id: i32 = -1; - let mut current_position_ms: u64 = 0; - let mut current_duration_ms: u64 = 0; - let mut current_buffer_state: u64 = 2; // 2=OK per proto - let mut volume: u8 = 100; - let mut muted = false; - let mut pre_mute_volume: u8 = 100; - let mut max_audio_quality: u32 = 4; // proto quality value 4 = Hi-Res 192 - let mut current_track_id: i32 = 0; // track_id of currently playing track - let mut last_play_command_at: std::time::Instant = std::time::Instant::now(); - let mut has_seen_position_progress = false; // true once we've seen pos > 0 after a Play - let mut track_ended = false; // true when player finishes track naturally - let mut ignore_nonzero_seek_until: Option = None; - - // Helper macro: send a state update - macro_rules! send_state { - ($ws_tx:expr, $msg_id:expr) => {{ - debug!( - "[SEND] StateUpdated: playing={} buffer={} pos={}ms dur={}ms qi={} nqi={}", - current_playing_state, - current_buffer_state, - current_position_ms, - current_duration_ms, - current_queue_item_id, - current_next_queue_item_id - ); - let sm = msg_state_updated( - current_playing_state, - current_buffer_state, - current_position_ms, - current_duration_ms, - current_queue_item_id, - current_next_queue_item_id, - ); - $ws_tx - .send(Message::Binary(build_payload_frame($msg_id, &sm).into())) - .await?; - $msg_id += 1; - }}; - } - - info!("QConnect: entering main loop"); - let mut position_ticker = tokio::time::interval(std::time::Duration::from_millis(500)); + let mut position_ticker = tokio::time::interval(Duration::from_millis(500)); position_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { tokio::select! { _ = position_ticker.tick() => { - if current_playing_state == 2 { + let (playing_state, last_play_at, has_progress, track_ended_local, dur, qi, nqi) = { + let loc = local.lock().await; + ( + loc.current_playing_state, + loc.last_play_at, + loc.has_position_progress, + loc.track_ended, + loc.current_duration_ms, + loc.current_queue_item_id, + loc.current_next_queue_item_id, + ) + }; + + if playing_state == 2 { let status = player.status(); - let elapsed_since_play = last_play_command_at.elapsed(); + let elapsed = last_play_at.elapsed(); if status.state == PlayerState::Stopped - && has_seen_position_progress - && elapsed_since_play > std::time::Duration::from_secs(3) + && has_progress + && elapsed > Duration::from_secs(3) { - if !track_ended { - // Track ended naturally — send final position then request next track - info!("[TICK] Track ended naturally, sending ACTION_TYPE_NEXT"); - track_ended = true; - current_position_ms = current_duration_ms; - send_state!(ws_tx, msg_id); - - // Tell server to advance to next track (ACTION_TYPE_NEXT = 2) - let action_msg = msg_renderer_action(2, None); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?; - msg_id += 1; + if !track_ended_local { + info!("[QConnect] Track ended naturally, nqi={nqi}"); + { + let mut loc = local.lock().await; + loc.track_ended = true; + loc.current_playing_state = 1; + } + handler.send_state_report(1, dur, dur, qi, nqi).await; + if nqi > 0 { + // There is a next track — ask server to advance + handler.send(build_renderer_action_next()).await; + } + // If nqi <= 0, just stay stopped; server will keep loop mode off } - // Don't spam — wait for server to send new SET_STATE - } else if status.state == PlayerState::Stopped { - debug!("[TICK] Player stopped but grace period (elapsed={:?}, progress={}), ignoring", - elapsed_since_play, has_seen_position_progress); - } else { + } else if status.state != PlayerState::Stopped { let new_pos = status.position_ms; - if new_pos > 0 && !has_seen_position_progress { - has_seen_position_progress = true; - info!("[TICK] First position progress: {}ms", new_pos); - } - if new_pos != current_position_ms { - current_position_ms = new_pos; - send_state!(ws_tx, msg_id); + if new_pos > 0 { + { + let mut loc = local.lock().await; + if !loc.has_position_progress { + loc.has_position_progress = true; + info!("[QConnect] First position progress: {new_pos}ms"); + } + } + // Periodic position report so the app's progress bar updates + handler.send_state_report(2, new_pos, dur, qi, nqi).await; } } } } - msg = ws_rx.next() => { - match msg { - Some(Ok(Message::Binary(data))) => { - let cmds = parse_incoming_commands(&data); - for cmd in cmds { - let _ = cmd_tx.try_send(cmd.clone()); - match &cmd { - QConnectCommand::SetState { - playing_state, - position_ms, - current_track, - next_track, - queue_version_major, - } => { - info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}", - playing_state, current_track.as_ref().map(|t| t.track_id), - next_track.as_ref().map(|t| t.track_id), position_ms.unwrap_or(0)); - - let requested_pos = position_ms.map(|p| p as u64); - - let seek_only_state = playing_state.is_none() - && current_track.is_none() - && next_track.is_none() - && *queue_version_major == 0 - && requested_pos.is_some(); - - if seek_only_state { - let target_pos = requested_pos.unwrap_or(0); - let status = player.status(); - let current_player_pos = status.position_ms; - let mut should_seek = target_pos == 0 - || target_pos.abs_diff(current_player_pos) > 350; - let suppress_nonzero_seek = target_pos > 0 - && ignore_nonzero_seek_until - .map(|deadline| std::time::Instant::now() < deadline) - .unwrap_or(false); - if suppress_nonzero_seek { - should_seek = false; - info!( - "[STATE] Ignoring non-zero seek {}ms during settle window", - target_pos - ); - } - - info!( - "[STATE] seek-only command: target={}ms local={}ms state={:?} track={} should_seek={}", - target_pos, - current_player_pos, - status.state, - status.track_id, - should_seek - ); - - if should_seek { - info!( - "[STATE] Applying seek to {}ms (local={}ms)", - target_pos, current_player_pos - ); - player.send(PlayerCommand::Seek(target_pos)); - track_ended = false; - if target_pos == 0 { - ignore_nonzero_seek_until = Some( - std::time::Instant::now() - + std::time::Duration::from_secs(2), - ); - } - } - - current_position_ms = target_pos; - send_state!(ws_tx, msg_id); - continue; - } - - // 1. Store next_track metadata - if let Some(nt) = next_track { - current_next_queue_item_id = nt.queue_item_id; - } - - // 2. Load new current_track if present and different - let mut loaded_new_track = false; - if let Some(track) = current_track { - if track.track_id != current_track_id || track.queue_item_id != current_queue_item_id { - info!("[STATE] Loading new track {} (qi={})", track.track_id, track.queue_item_id); - current_track_id = track.track_id; - current_queue_item_id = track.queue_item_id; - current_playing_state = 2; - current_buffer_state = 1; // BUFFERING - current_position_ms = requested_pos.unwrap_or(0); - current_duration_ms = 0; - last_play_command_at = std::time::Instant::now(); - has_seen_position_progress = false; - track_ended = false; - ignore_nonzero_seek_until = None; - send_state!(ws_tx, msg_id); - - let track_id_str = track.track_id.to_string(); - let format_id = quality_to_format_id(max_audio_quality); - let duration_ms = match api.get_track(auth_token, &track_id_str).await { - Ok(t) => t.duration.unwrap_or(0) as u64 * 1000, - Err(e) => { warn!("get_track failed: {}", e); 0 } - }; - current_duration_ms = duration_ms; - match api.get_track_stream(auth_token, &track_id_str, format_id).await { - Ok(stream) => { - let (player_stream, stream_sr, stream_bits, stream_ch) = match stream { - TrackStream::DirectUrl { - url, - sampling_rate_hz, - bit_depth, - channels, - } => { - info!("[STATE] Got direct stream URL (duration={}ms)", duration_ms); - ( - StreamSource::DirectUrl(url), - sampling_rate_hz, - bit_depth, - channels, - ) - } - TrackStream::Segmented { - url_template, - n_segments, - encryption_key_hex, - sampling_rate_hz, - bit_depth, - channels, - } => { - info!( - "[STATE] Got segmented stream (segments={}, duration={}ms)", - n_segments, duration_ms - ); - ( - StreamSource::Segmented { - url_template, - n_segments, - encryption_key_hex, - }, - sampling_rate_hz, - bit_depth, - channels, - ) - } - }; - player.send(PlayerCommand::Play { - stream: player_stream, - track_id: track.track_id, - queue_item_id: track.queue_item_id, - duration_ms, - start_position_ms: requested_pos.unwrap_or(0), - }); - let (fallback_sr, fallback_bits, fallback_ch) = - quality_fallback_audio_params(max_audio_quality); - let sr = stream_sr.unwrap_or(fallback_sr).max(1); - let bits = stream_bits.unwrap_or(fallback_bits).max(1); - let ch = stream_ch.unwrap_or(fallback_ch).max(1); - let file_msg = msg_file_audio_quality_changed( - sr as u64, - bits as u64, - ch as u64, - max_audio_quality as u64, - ); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &file_msg).into())).await?; - msg_id += 1; - let dev_msg = msg_device_audio_quality_changed( - sr as u64, - bits as u64, - ch as u64, - ); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &dev_msg).into())).await?; - msg_id += 1; - current_buffer_state = 2; // OK - } - Err(e) => { - error!("[STATE] Failed to get stream URL: {}", e); - current_playing_state = 1; - current_buffer_state = 2; // OK - } - } - loaded_new_track = true; - } - } - - // 3. Apply playing_state if present (and we didn't just load a new track) - if let Some(ps) = playing_state { - if !loaded_new_track { - match ps { - 2 => { - let status = player.status(); - let should_restart_same_track = current_track.is_some() - && (track_ended || status.state == PlayerState::Stopped); - - if should_restart_same_track { - let restart_pos = requested_pos.unwrap_or(0); - info!( - "[STATE] Restarting current track from {}ms (ended={} player_state={:?})", - restart_pos, - track_ended, - status.state - ); - player.send(PlayerCommand::Seek(restart_pos)); - current_playing_state = 2; - current_position_ms = restart_pos; - track_ended = false; - } else if current_playing_state == 3 { - info!("[STATE] Resuming playback"); - player.send(PlayerCommand::Resume); - current_playing_state = 2; - track_ended = false; - } else if current_playing_state != 2 { - info!("[STATE] Play requested but no new track, state={}", current_playing_state); - current_playing_state = 2; - } - } - 3 => { - info!("[STATE] Pausing playback"); - player.send(PlayerCommand::Pause); - current_playing_state = 3; - if let Some(pos) = requested_pos { - current_position_ms = pos; - } else { - current_position_ms = player.status().position_ms; - } - } - 1 => { - info!("[STATE] Stopping playback"); - player.send(PlayerCommand::Stop); - current_playing_state = 1; - current_position_ms = 0; - current_queue_item_id = -1; - current_next_queue_item_id = -1; - current_track_id = 0; - track_ended = false; - } - _ => {} - } - } - } - - // 4. Apply seek position if provided and not loading new track - let is_pause = matches!(playing_state, Some(3)); - let position_control_state = *queue_version_major == 0 - && current_track.is_none() - && next_track.is_none() - && requested_pos.is_some(); - if !loaded_new_track && !is_pause && position_control_state { - let requested = requested_pos.unwrap_or(0); - let status = player.status(); - let local = status.position_ms; - let mut should_seek = requested == 0 - || requested.abs_diff(local) > 350; - let suppress_nonzero_seek = requested > 0 - && ignore_nonzero_seek_until - .map(|deadline| std::time::Instant::now() < deadline) - .unwrap_or(false); - if suppress_nonzero_seek { - should_seek = false; - info!( - "[STATE] Ignoring non-zero position control {}ms during settle window", - requested - ); - } - info!( - "[STATE] position-control command: playing_state={:?} target={}ms local={}ms state={:?} track={} should_seek={}", - playing_state, - requested, - local, - status.state, - status.track_id, - should_seek - ); - - if should_seek { - info!( - "[STATE] Position jump detected, seeking to {}ms (local={}ms)", - requested, local - ); - player.send(PlayerCommand::Seek(requested)); - track_ended = false; - if requested == 0 { - ignore_nonzero_seek_until = Some( - std::time::Instant::now() - + std::time::Duration::from_secs(2), - ); - } - } - current_position_ms = requested; - } else if !loaded_new_track && !is_pause { - if let Some(pos) = requested_pos { - current_position_ms = pos; - } - } - - // 5. Always send state update (like reference implementation) - send_state!(ws_tx, msg_id); - } - - QConnectCommand::SetVolume { volume: vol, delta } => { - let new_vol = if let Some(v) = vol { - (*v).min(100) as u8 - } else if let Some(d) = delta { - (volume as i32 + d).clamp(0, 100) as u8 - } else { - volume - }; - info!("Setting volume to {}", new_vol); - volume = new_vol; - if muted && new_vol > 0 { muted = false; } - player.send(PlayerCommand::SetVolume(new_vol)); - let resp = msg_volume_changed(new_vol as u64); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &resp).into())).await?; - msg_id += 1; - } - - QConnectCommand::SetActive { active } => { - info!("SetActive: {}", active); - if !*active { - player.send(PlayerCommand::Stop); - current_playing_state = 1; - current_buffer_state = 2; // OK - current_position_ms = 0; - current_queue_item_id = -1; - current_next_queue_item_id = -1; - current_track_id = 0; - send_state!(ws_tx, msg_id); - } - } - - QConnectCommand::MuteVolume(mute) => { - info!("MuteVolume: {}", mute); - if *mute { - pre_mute_volume = volume; - volume = 0; - muted = true; - } else { - volume = pre_mute_volume; - muted = false; - } - player.send(PlayerCommand::SetVolume(volume)); - let resp = msg_volume_muted(*mute); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &resp).into())).await?; - msg_id += 1; - } - - QConnectCommand::SetMaxAudioQuality(quality) => { - let format_id = quality_to_format_id(*quality); - info!("SetMaxAudioQuality: {} (format_id={})", quality, format_id); - max_audio_quality = *quality; - - // Confirm quality change to server - let resp = msg_max_audio_quality_changed(*quality as u64, None); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &resp).into())).await?; - msg_id += 1; - - // If currently playing, restart at new quality - if current_playing_state == 2 && current_track_id != 0 { - let restart_pos = player.status().position_ms; - info!("Restarting track {} at new quality format_id={}", current_track_id, format_id); - current_buffer_state = 1; // BUFFERING - current_position_ms = restart_pos; - send_state!(ws_tx, msg_id); - - let track_id_str = current_track_id.to_string(); - let duration_ms = match api.get_track(auth_token, &track_id_str).await { - Ok(t) => t.duration.unwrap_or(0) as u64 * 1000, - Err(e) => { warn!("get_track failed: {}", e); current_duration_ms } - }; - current_duration_ms = duration_ms; - match api.get_track_stream(auth_token, &track_id_str, format_id).await { - Ok(stream) => { - let (player_stream, stream_sr, stream_bits, stream_ch) = match stream { - TrackStream::DirectUrl { - url, - sampling_rate_hz, - bit_depth, - channels, - } => { - ( - StreamSource::DirectUrl(url), - sampling_rate_hz, - bit_depth, - channels, - ) - } - TrackStream::Segmented { - url_template, - n_segments, - encryption_key_hex, - sampling_rate_hz, - bit_depth, - channels, - } => { - ( - StreamSource::Segmented { - url_template, - n_segments, - encryption_key_hex, - }, - sampling_rate_hz, - bit_depth, - channels, - ) - } - }; - player.send(PlayerCommand::Play { - stream: player_stream, - track_id: current_track_id, - queue_item_id: current_queue_item_id, - duration_ms, - start_position_ms: restart_pos, - }); - let (fallback_sr, fallback_bits, fallback_ch) = - quality_fallback_audio_params(*quality); - let sr = stream_sr.unwrap_or(fallback_sr).max(1); - let bits = stream_bits.unwrap_or(fallback_bits).max(1); - let ch = stream_ch.unwrap_or(fallback_ch).max(1); - let file_msg = msg_file_audio_quality_changed( - sr as u64, - bits as u64, - ch as u64, - *quality as u64, - ); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &file_msg).into())).await?; - msg_id += 1; - let dev_msg = msg_device_audio_quality_changed( - sr as u64, - bits as u64, - ch as u64, - ); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &dev_msg).into())).await?; - msg_id += 1; - // Re-emit quality confirmation after successful restart - // so controllers observing the currently active stream update promptly. - let confirm = msg_max_audio_quality_changed(*quality as u64, None); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &confirm).into())).await?; - msg_id += 1; - current_buffer_state = 2; // OK(2) - info!("Restarted at format_id={}", format_id); + incoming = ws_read.next() => { + match incoming { + Some(Ok(WsMessage::Binary(data))) => { + match decode_frame(&data) { + Ok((MSG_TYPE_PAYLOAD, payload)) => { + match CloudPayload::decode(payload.as_slice()) { + Ok(cloud) => { + let inner = cloud.payload.unwrap_or_default(); + match QConnectMessages::decode(inner.as_slice()) { + Ok(batch) => { + handler.dispatch_inbound(batch).await; } Err(e) => { - error!("Failed to get stream URL for quality change: {}", e); - current_playing_state = 1; - current_buffer_state = 2; // OK + debug!("[QConnect] QConnectMessages decode error: {e}"); } } - send_state!(ws_tx, msg_id); + } + Err(e) => { + debug!("[QConnect] CloudPayload decode error: {e}"); } } - - QConnectCommand::SetLoopMode(mode) => { - info!("SetLoopMode: {}", mode); - let _ = mode; - // No response message — renderer stores setting, server notifies controllers directly - } - - QConnectCommand::SetShuffleMode(mode) => { - info!("SetShuffleMode: {}", mode); - let _ = mode; - // No response message — renderer stores setting, server notifies controllers directly - } - - QConnectCommand::Unknown(_) => {} + } + Ok((MSG_TYPE_AUTHENTICATE, _)) => { + debug!("[QConnect] Auth ack received"); + } + Ok((MSG_TYPE_SUBSCRIBE, _)) => { + debug!("[QConnect] Subscribe ack received"); + } + Ok((9, _)) => { + return Err(anyhow::anyhow!("server sent error frame")); + } + Ok((10, _)) => { + return Err(anyhow::anyhow!("server sent disconnect frame")); + } + Ok(_) => {} + Err(e) => { + warn!("[QConnect] Frame decode error: {e}"); } } } - Some(Ok(Message::Ping(data))) => { - ws_tx.send(Message::Pong(data)).await?; + Some(Ok(WsMessage::Ping(payload))) => { + let _ = ws_write.send(WsMessage::Pong(payload)).await; } - Some(Ok(Message::Close(_))) | None => { - bail!("WebSocket closed"); + Some(Ok(WsMessage::Pong(_))) => {} + Some(Ok(WsMessage::Close(_))) => { + return Err(anyhow::anyhow!("server closed connection")); } + Some(Ok(_)) => {} Some(Err(e)) => { - bail!("WebSocket error: {}", e); + return Err(anyhow::anyhow!("WebSocket read error: {e}")); } - _ => {} + None => { + return Err(anyhow::anyhow!("WebSocket stream ended")); + } + } + } + + Some(msg) = out_rx.recv() => { + let frame = build_payload_frame(msg, &mut msg_id); + if let Err(e) = ws_write.send(WsMessage::Binary(frame.into())).await { + return Err(anyhow::anyhow!("WebSocket write error: {e}")); } } } -- 2.49.1