Fix server-driven seek handling and stream re-seek reliability
This commit is contained in:
@@ -441,7 +441,7 @@ fn extract_qconnect_messages(frame_body: &[u8]) -> Vec<(u32, Vec<u8>)> {
|
||||
pub enum QConnectCommand {
|
||||
SetState {
|
||||
playing_state: Option<u32>, // None = not set (keep current), Some(1)=stopped, Some(2)=playing, Some(3)=paused
|
||||
position_ms: u32,
|
||||
position_ms: Option<u32>, // None = field not present
|
||||
current_track: Option<TrackRef>,
|
||||
next_track: Option<TrackRef>,
|
||||
queue_version_major: u32,
|
||||
@@ -496,7 +496,7 @@ fn parse_incoming_commands(data: &[u8]) -> Vec<QConnectCommand> {
|
||||
41 => {
|
||||
let fields = parse_fields(&payload);
|
||||
let playing_state = get_varint_field(&fields, 1).map(|v| v as u32); // None = not present
|
||||
let position_ms = get_varint_field(&fields, 2).unwrap_or(0) as u32;
|
||||
let position_ms = get_varint_field(&fields, 2).map(|v| v as u32);
|
||||
let queue_version_major = get_bytes_field(&fields, 3)
|
||||
.map(|qv| {
|
||||
let qvf = parse_fields(qv);
|
||||
@@ -506,7 +506,7 @@ fn parse_incoming_commands(data: &[u8]) -> Vec<QConnectCommand> {
|
||||
let current_track = get_bytes_field(&fields, 4).map(parse_queue_track);
|
||||
let next_track = get_bytes_field(&fields, 5).map(parse_queue_track);
|
||||
|
||||
info!("[RECV] SET_STATE: playing_state={:?}, position_ms={}, current_track={:?}, next_track={:?}, queue_ver={}",
|
||||
info!("[RECV] SET_STATE: playing_state={:?}, position_ms={:?}, current_track={:?}, next_track={:?}, queue_ver={}",
|
||||
playing_state, position_ms, current_track, next_track, queue_version_major);
|
||||
|
||||
QConnectCommand::SetState {
|
||||
@@ -939,20 +939,31 @@ async fn run_connection(
|
||||
} => {
|
||||
info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}",
|
||||
playing_state, current_track.as_ref().map(|t| t.track_id),
|
||||
next_track.as_ref().map(|t| t.track_id), position_ms);
|
||||
next_track.as_ref().map(|t| t.track_id), position_ms.unwrap_or(0));
|
||||
|
||||
let requested_pos = position_ms.map(|p| p as u64);
|
||||
|
||||
let seek_only_state = playing_state.is_none()
|
||||
&& current_track.is_none()
|
||||
&& next_track.is_none()
|
||||
&& *queue_version_major == 0
|
||||
&& *position_ms > 0;
|
||||
&& requested_pos.is_some();
|
||||
|
||||
if seek_only_state {
|
||||
let target_pos = *position_ms as u64;
|
||||
let target_pos = requested_pos.unwrap_or(0);
|
||||
let status = player.status();
|
||||
let current_player_pos = status.position_ms;
|
||||
let should_seek = status.state != PlayerState::Stopped
|
||||
&& target_pos.abs_diff(current_player_pos) > 800;
|
||||
let should_seek = target_pos == 0
|
||||
|| target_pos.abs_diff(current_player_pos) > 350;
|
||||
|
||||
info!(
|
||||
"[STATE] seek-only command: target={}ms local={}ms state={:?} track={} should_seek={}",
|
||||
target_pos,
|
||||
current_player_pos,
|
||||
status.state,
|
||||
status.track_id,
|
||||
should_seek
|
||||
);
|
||||
|
||||
if should_seek {
|
||||
info!(
|
||||
@@ -961,11 +972,6 @@ async fn run_connection(
|
||||
);
|
||||
player.send(PlayerCommand::Seek(target_pos));
|
||||
track_ended = false;
|
||||
|
||||
// ACTION_TYPE_SEEK = 8
|
||||
let action_msg = msg_renderer_action(8, Some(*position_ms));
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
}
|
||||
|
||||
current_position_ms = target_pos;
|
||||
@@ -987,7 +993,7 @@ async fn run_connection(
|
||||
current_queue_item_id = track.queue_item_id;
|
||||
current_playing_state = 2;
|
||||
current_buffer_state = 1; // BUFFERING
|
||||
current_position_ms = *position_ms as u64;
|
||||
current_position_ms = requested_pos.unwrap_or(0);
|
||||
current_duration_ms = 0;
|
||||
last_play_command_at = std::time::Instant::now();
|
||||
has_seen_position_progress = false;
|
||||
@@ -1009,7 +1015,7 @@ async fn run_connection(
|
||||
track_id: track.track_id,
|
||||
queue_item_id: track.queue_item_id,
|
||||
duration_ms,
|
||||
start_position_ms: *position_ms as u64,
|
||||
start_position_ms: requested_pos.unwrap_or(0),
|
||||
});
|
||||
current_buffer_state = 2; // OK
|
||||
}
|
||||
@@ -1042,8 +1048,8 @@ async fn run_connection(
|
||||
info!("[STATE] Pausing playback");
|
||||
player.send(PlayerCommand::Pause);
|
||||
current_playing_state = 3;
|
||||
if *position_ms > 0 {
|
||||
current_position_ms = *position_ms as u64;
|
||||
if let Some(pos) = requested_pos {
|
||||
current_position_ms = pos;
|
||||
} else {
|
||||
current_position_ms = player.status().position_ms;
|
||||
}
|
||||
@@ -1065,20 +1071,39 @@ async fn run_connection(
|
||||
|
||||
// 4. Apply seek position if provided and not loading new track
|
||||
let is_pause = matches!(playing_state, Some(3));
|
||||
if !loaded_new_track && *position_ms > 0 && !is_pause {
|
||||
let requested = *position_ms as u64;
|
||||
let position_control_state = *queue_version_major == 0
|
||||
&& current_track.is_none()
|
||||
&& next_track.is_none()
|
||||
&& requested_pos.is_some();
|
||||
if !loaded_new_track && !is_pause && position_control_state {
|
||||
let requested = requested_pos.unwrap_or(0);
|
||||
let status = player.status();
|
||||
let local = status.position_ms;
|
||||
if status.state != PlayerState::Stopped && requested.abs_diff(local) > 1500 {
|
||||
info!("[STATE] Position jump detected, seeking to {}ms (local={}ms)", requested, local);
|
||||
let should_seek = requested == 0
|
||||
|| requested.abs_diff(local) > 350;
|
||||
info!(
|
||||
"[STATE] position-control command: playing_state={:?} target={}ms local={}ms state={:?} track={} should_seek={}",
|
||||
playing_state,
|
||||
requested,
|
||||
local,
|
||||
status.state,
|
||||
status.track_id,
|
||||
should_seek
|
||||
);
|
||||
|
||||
if should_seek {
|
||||
info!(
|
||||
"[STATE] Position jump detected, seeking to {}ms (local={}ms)",
|
||||
requested, local
|
||||
);
|
||||
player.send(PlayerCommand::Seek(requested));
|
||||
track_ended = false;
|
||||
// ACTION_TYPE_SEEK = 8
|
||||
let action_msg = msg_renderer_action(8, Some(*position_ms));
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
}
|
||||
current_position_ms = requested;
|
||||
} else if !loaded_new_track && !is_pause {
|
||||
if let Some(pos) = requested_pos {
|
||||
current_position_ms = pos;
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Always send state update (like reference implementation)
|
||||
|
||||
Reference in New Issue
Block a user