add monitor sync jobs and duplicate-safe qobuz updates
This commit is contained in:
@@ -5,15 +5,18 @@ import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"qtransfer/internal/config"
|
||||
"qtransfer/internal/jobconfig"
|
||||
"qtransfer/internal/match"
|
||||
"qtransfer/internal/model"
|
||||
"qtransfer/internal/qobuz"
|
||||
@@ -30,6 +33,11 @@ type sourceSelection struct {
|
||||
IncludeLiked bool
|
||||
}
|
||||
|
||||
type monitorPlan struct {
|
||||
SyncMode string
|
||||
TargetPlaylistID int64
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := run(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
@@ -54,6 +62,9 @@ func run() error {
|
||||
sess.Spotify = session.SpotifyState{}
|
||||
sess.Qobuz = session.QobuzState{}
|
||||
}
|
||||
if sess.Playlists == nil {
|
||||
sess.Playlists = map[string]session.PlaylistSyncRef{}
|
||||
}
|
||||
applySessionDefaults(&cfg, &sess)
|
||||
|
||||
if cfg.Command == "login" {
|
||||
@@ -69,6 +80,26 @@ func run() error {
|
||||
return err
|
||||
}
|
||||
|
||||
fileCfg := jobconfig.File{}
|
||||
if strings.TrimSpace(cfg.ConfigFile) != "" {
|
||||
loaded, err := jobconfig.Load(cfg.ConfigFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fileCfg = loaded
|
||||
if err := applyFileGlobals(&cfg, fileCfg.Global); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
planURLs, monitorPlans, err := buildPlaylistPlans(cfg, fileCfg.Playlists)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(planURLs) > 0 {
|
||||
cfg.PlaylistURLs = planURLs
|
||||
}
|
||||
|
||||
spToken, err := authenticateSpotify(ctx, cfg, &sess)
|
||||
if err != nil {
|
||||
return fmt.Errorf("spotify auth failed: %w", err)
|
||||
@@ -86,7 +117,7 @@ func run() error {
|
||||
}
|
||||
|
||||
if cfg.Monitor {
|
||||
if err := runMonitorMode(ctx, cfg, sp, &sess, selection); err != nil {
|
||||
if err := runMonitorMode(ctx, cfg, sp, &sess, selection, monitorPlans); err != nil {
|
||||
return err
|
||||
}
|
||||
return persistSession(cfg, sess)
|
||||
@@ -264,17 +295,24 @@ func prompt(scanner *bufio.Scanner, label, defaultValue string) string {
|
||||
return v
|
||||
}
|
||||
|
||||
func runMonitorMode(ctx context.Context, cfg config.Config, sp *spotify.Client, sess *session.Data, selection sourceSelection) error {
|
||||
func runMonitorMode(ctx context.Context, cfg config.Config, sp *spotify.Client, sess *session.Data, selection sourceSelection, plans map[string]monitorPlan) error {
|
||||
if len(selection.Playlists) == 0 && !selection.IncludeLiked {
|
||||
return fmt.Errorf("monitor mode requires at least one playlist or --liked")
|
||||
}
|
||||
if sess.Playlists == nil {
|
||||
sess.Playlists = map[string]session.PlaylistSyncRef{}
|
||||
}
|
||||
|
||||
fmt.Println("Starting monitor mode...")
|
||||
fmt.Printf("Watching %d playlist(s)", len(selection.Playlists))
|
||||
if selection.IncludeLiked {
|
||||
fmt.Printf(" + liked songs")
|
||||
}
|
||||
fmt.Printf(" | interval=%s\n", cfg.MonitorInterval)
|
||||
fmt.Printf(" | interval=%s", cfg.MonitorInterval)
|
||||
if cfg.MonitorTransfer {
|
||||
fmt.Printf(" | sync-mode=%s", cfg.SyncMode)
|
||||
}
|
||||
fmt.Println()
|
||||
|
||||
playlistIDs := make([]string, 0, len(selection.Playlists))
|
||||
for _, p := range selection.Playlists {
|
||||
@@ -290,7 +328,7 @@ func runMonitorMode(ctx context.Context, cfg config.Config, sp *spotify.Client,
|
||||
return err
|
||||
}
|
||||
matcher = match.NewMatcher(qb)
|
||||
fmt.Println("Monitor transfer mode enabled: changed playlists will be transferred.")
|
||||
fmt.Println("Monitor transfer mode enabled: changed playlists will be synced to Qobuz.")
|
||||
}
|
||||
|
||||
prev := cloneMap(sess.Monitor)
|
||||
@@ -335,25 +373,24 @@ func runMonitorMode(ctx context.Context, cfg config.Config, sp *spotify.Client,
|
||||
fmt.Printf("[%s] Updated: %s\n", time.Now().Format("15:04:05"), strings.Join(names, ", "))
|
||||
|
||||
if cfg.MonitorTransfer && qb != nil && matcher != nil {
|
||||
transferCfg := transfer.Config{
|
||||
DryRun: cfg.DryRun,
|
||||
PublicPlaylists: cfg.PublicPlaylists,
|
||||
Concurrency: cfg.Concurrency,
|
||||
LikedName: cfg.LikedPlaylist,
|
||||
Progress: func(msg string) {
|
||||
fmt.Printf("\r%-140s", msg)
|
||||
},
|
||||
for _, pl := range changedPlaylists {
|
||||
key := "playlist:" + pl.SourceID
|
||||
plan := plans[pl.SourceID]
|
||||
if err := syncChangedPlaylist(ctx, cfg, qb, matcher, key, pl, plan, sess); err != nil {
|
||||
fmt.Printf("Sync error for %s: %v\n", pl.Name, err)
|
||||
curr[key] = prev[key]
|
||||
}
|
||||
}
|
||||
|
||||
likedToTransfer := []model.Track{}
|
||||
if likedChanged {
|
||||
likedToTransfer = currentLiked
|
||||
}
|
||||
if _, err := transfer.Run(ctx, transferCfg, qb, matcher, changedPlaylists, likedToTransfer, likedChanged); err != nil {
|
||||
return fmt.Errorf("monitor transfer failed: %w", err)
|
||||
plan := monitorPlan{SyncMode: cfg.SyncMode}
|
||||
likedPlaylist := model.Playlist{Name: cfg.LikedPlaylist, Tracks: currentLiked}
|
||||
if err := syncChangedPlaylist(ctx, cfg, qb, matcher, "liked", likedPlaylist, plan, sess); err != nil {
|
||||
fmt.Printf("Sync error for liked songs: %v\n", err)
|
||||
curr["liked"] = prev["liked"]
|
||||
}
|
||||
}
|
||||
fmt.Print("\r")
|
||||
fmt.Println("Monitor transfer cycle complete. ")
|
||||
fmt.Println("Monitor sync cycle complete. ")
|
||||
}
|
||||
|
||||
prev = curr
|
||||
@@ -525,6 +562,114 @@ func resolvePlaylistIDs(inputs []string) ([]string, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func applyFileGlobals(cfg *config.Config, g jobconfig.GlobalConfig) error {
|
||||
if g.Monitor != nil {
|
||||
cfg.Monitor = *g.Monitor
|
||||
}
|
||||
if g.MonitorOnce != nil {
|
||||
cfg.MonitorOnce = *g.MonitorOnce
|
||||
}
|
||||
if g.MonitorTransfer != nil {
|
||||
cfg.MonitorTransfer = *g.MonitorTransfer
|
||||
}
|
||||
if strings.TrimSpace(g.MonitorInterval) != "" {
|
||||
d, err := time.ParseDuration(strings.TrimSpace(g.MonitorInterval))
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid global monitor_interval %q: %w", g.MonitorInterval, err)
|
||||
}
|
||||
cfg.MonitorInterval = d
|
||||
}
|
||||
if strings.TrimSpace(g.SyncMode) != "" {
|
||||
cfg.SyncMode = strings.ToLower(strings.TrimSpace(g.SyncMode))
|
||||
}
|
||||
if g.IncludeLiked != nil {
|
||||
cfg.IncludeLiked = *g.IncludeLiked
|
||||
}
|
||||
if g.DryRun != nil {
|
||||
cfg.DryRun = *g.DryRun
|
||||
}
|
||||
if g.PublicPlaylists != nil {
|
||||
cfg.PublicPlaylists = *g.PublicPlaylists
|
||||
}
|
||||
if g.Concurrency != nil {
|
||||
cfg.Concurrency = *g.Concurrency
|
||||
}
|
||||
if strings.TrimSpace(g.ReportPath) != "" {
|
||||
cfg.ReportPath = strings.TrimSpace(g.ReportPath)
|
||||
}
|
||||
|
||||
return cfg.Validate()
|
||||
}
|
||||
|
||||
func buildPlaylistPlans(cfg config.Config, fileEntries []jobconfig.PlaylistEntry) ([]string, map[string]monitorPlan, error) {
|
||||
urls := make([]string, 0, len(cfg.PlaylistURLs)+len(fileEntries))
|
||||
for _, u := range cfg.PlaylistURLs {
|
||||
u = strings.TrimSpace(u)
|
||||
if u != "" {
|
||||
urls = append(urls, u)
|
||||
}
|
||||
}
|
||||
|
||||
plans := map[string]monitorPlan{}
|
||||
|
||||
for _, raw := range urls {
|
||||
id, err := spotify.ParsePlaylistID(raw)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid playlist-url %q: %w", raw, err)
|
||||
}
|
||||
plans[id] = monitorPlan{SyncMode: cfg.SyncMode, TargetPlaylistID: cfg.TargetPlaylistID}
|
||||
}
|
||||
|
||||
for i, entry := range fileEntries {
|
||||
if !entry.IsEnabled() {
|
||||
continue
|
||||
}
|
||||
raw := strings.TrimSpace(entry.URL)
|
||||
id, err := spotify.ParsePlaylistID(raw)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid config playlist entry %d url %q: %w", i+1, entry.URL, err)
|
||||
}
|
||||
mode := strings.ToLower(strings.TrimSpace(entry.SyncMode))
|
||||
if mode == "" {
|
||||
mode = cfg.SyncMode
|
||||
}
|
||||
if mode == "" {
|
||||
mode = "append"
|
||||
}
|
||||
if entry.TargetPlaylistID > 0 && mode == "mirror" {
|
||||
return nil, nil, fmt.Errorf("playlist entry %d cannot use mirror mode with target_playlist_id", i+1)
|
||||
}
|
||||
plans[id] = monitorPlan{SyncMode: mode, TargetPlaylistID: entry.TargetPlaylistID}
|
||||
urls = append(urls, raw)
|
||||
}
|
||||
|
||||
if cfg.TargetPlaylistID > 0 {
|
||||
uniqueIDs := map[string]struct{}{}
|
||||
for id := range plans {
|
||||
uniqueIDs[id] = struct{}{}
|
||||
}
|
||||
if len(uniqueIDs) != 1 {
|
||||
return nil, nil, fmt.Errorf("--target-playlist-id can only be used with exactly one source playlist URL")
|
||||
}
|
||||
}
|
||||
|
||||
seen := map[string]struct{}{}
|
||||
deduped := make([]string, 0, len(urls))
|
||||
for _, u := range urls {
|
||||
u = strings.TrimSpace(u)
|
||||
if u == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[u]; ok {
|
||||
continue
|
||||
}
|
||||
seen[u] = struct{}{}
|
||||
deduped = append(deduped, u)
|
||||
}
|
||||
|
||||
return deduped, plans, nil
|
||||
}
|
||||
|
||||
func buildFingerprintMap(playlists []model.Playlist, liked []model.Track, includeLiked bool) map[string]string {
|
||||
m := make(map[string]string, len(playlists)+1)
|
||||
for _, p := range playlists {
|
||||
@@ -623,6 +768,9 @@ func persistSession(cfg config.Config, sess session.Data) error {
|
||||
if sess.Monitor == nil {
|
||||
sess.Monitor = map[string]string{}
|
||||
}
|
||||
if sess.Playlists == nil {
|
||||
sess.Playlists = map[string]session.PlaylistSyncRef{}
|
||||
}
|
||||
return session.Save(cfg.SessionPath, sess)
|
||||
}
|
||||
|
||||
@@ -634,6 +782,168 @@ func cloneMap(in map[string]string) map[string]string {
|
||||
return out
|
||||
}
|
||||
|
||||
func syncChangedPlaylist(ctx context.Context, cfg config.Config, qb *qobuz.Client, matcher *match.Matcher, key string, pl model.Playlist, plan monitorPlan, sess *session.Data) error {
|
||||
mode := strings.ToLower(strings.TrimSpace(plan.SyncMode))
|
||||
if mode == "" {
|
||||
mode = cfg.SyncMode
|
||||
}
|
||||
if mode == "" {
|
||||
mode = "append"
|
||||
}
|
||||
if plan.TargetPlaylistID > 0 && mode == "mirror" {
|
||||
return fmt.Errorf("mirror mode is not supported with explicit target playlist id (%d)", plan.TargetPlaylistID)
|
||||
}
|
||||
|
||||
prev := sess.Playlists[key]
|
||||
matchedIDs, unmatched := matchPlaylistTracks(ctx, matcher, pl.Tracks, cfg.Concurrency, func(done, total int) {
|
||||
fmt.Printf("\r%-140s", fmt.Sprintf("Matching %s (%d/%d)", pl.Name, done, total))
|
||||
})
|
||||
matchedIDs = uniqueIDs(matchedIDs)
|
||||
fmt.Print("\r")
|
||||
|
||||
fingerprint := playlistFingerprint(pl)
|
||||
if key == "liked" {
|
||||
fingerprint = trackListFingerprint(pl.Tracks)
|
||||
}
|
||||
|
||||
if cfg.DryRun {
|
||||
fmt.Printf("Dry-run sync %s: matched=%d/%d unmatched=%d\n", pl.Name, len(matchedIDs), len(pl.Tracks), len(unmatched))
|
||||
sess.Playlists[key] = session.PlaylistSyncRef{
|
||||
SourceName: pl.Name,
|
||||
QobuzPlaylistID: prev.QobuzPlaylistID,
|
||||
Fingerprint: fingerprint,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
targetID := prev.QobuzPlaylistID
|
||||
if plan.TargetPlaylistID > 0 {
|
||||
targetID = plan.TargetPlaylistID
|
||||
}
|
||||
if mode == "mirror" && targetID > 0 {
|
||||
if err := qb.DeletePlaylist(ctx, targetID); err != nil {
|
||||
fmt.Printf("Warning: failed deleting old playlist %d for mirror sync: %v\n", targetID, err)
|
||||
}
|
||||
targetID = 0
|
||||
}
|
||||
|
||||
if targetID == 0 {
|
||||
createdID, err := qb.CreatePlaylist(ctx, pl.Name, sanitizeDescription(pl.Description), cfg.PublicPlaylists)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create qobuz playlist: %w", err)
|
||||
}
|
||||
targetID = createdID
|
||||
}
|
||||
|
||||
if len(matchedIDs) > 0 {
|
||||
if err := qb.AddTracksToPlaylist(ctx, targetID, matchedIDs); err != nil {
|
||||
if errors.Is(err, qobuz.ErrDuplicateTracks) {
|
||||
fmt.Printf("No new tracks added for %s: matched tracks already exist in Qobuz playlist %d.\n", pl.Name, targetID)
|
||||
} else {
|
||||
return fmt.Errorf("add tracks to qobuz playlist %d: %w", targetID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sess.Playlists[key] = session.PlaylistSyncRef{
|
||||
SourceName: pl.Name,
|
||||
QobuzPlaylistID: targetID,
|
||||
Fingerprint: fingerprint,
|
||||
}
|
||||
|
||||
fmt.Printf("Synced %s (%s): qobuz=%d matched=%d/%d unmatched=%d\n", pl.Name, mode, targetID, len(matchedIDs), len(pl.Tracks), len(unmatched))
|
||||
return nil
|
||||
}
|
||||
|
||||
func matchPlaylistTracks(ctx context.Context, matcher *match.Matcher, tracks []model.Track, concurrency int, progress func(done, total int)) ([]int64, []model.MatchedTrack) {
|
||||
if concurrency < 1 {
|
||||
concurrency = 1
|
||||
}
|
||||
|
||||
type job struct {
|
||||
idx int
|
||||
trk model.Track
|
||||
}
|
||||
type out struct {
|
||||
idx int
|
||||
res model.MatchedTrack
|
||||
}
|
||||
|
||||
jobs := make(chan job)
|
||||
results := make(chan out)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < concurrency; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := range jobs {
|
||||
m := matcher.MatchTrack(ctx, j.trk)
|
||||
results <- out{idx: j.idx, res: m}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
for i, t := range tracks {
|
||||
jobs <- job{idx: i, trk: t}
|
||||
}
|
||||
close(jobs)
|
||||
wg.Wait()
|
||||
close(results)
|
||||
}()
|
||||
|
||||
ordered := make([]model.MatchedTrack, len(tracks))
|
||||
total := len(tracks)
|
||||
step := 1
|
||||
if total > 100 {
|
||||
step = total / 100
|
||||
}
|
||||
done := 0
|
||||
for r := range results {
|
||||
ordered[r.idx] = r.res
|
||||
done++
|
||||
if progress != nil && (done == 1 || done == total || done%step == 0) {
|
||||
progress(done, total)
|
||||
}
|
||||
}
|
||||
|
||||
matched := make([]int64, 0, len(tracks))
|
||||
unmatched := make([]model.MatchedTrack, 0)
|
||||
for _, r := range ordered {
|
||||
if r.Matched && r.QobuzID > 0 {
|
||||
matched = append(matched, r.QobuzID)
|
||||
} else {
|
||||
unmatched = append(unmatched, r)
|
||||
}
|
||||
}
|
||||
return matched, unmatched
|
||||
}
|
||||
|
||||
func uniqueIDs(ids []int64) []int64 {
|
||||
seen := map[int64]struct{}{}
|
||||
out := make([]int64, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
if id == 0 {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[id]; ok {
|
||||
continue
|
||||
}
|
||||
seen[id] = struct{}{}
|
||||
out = append(out, id)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func sanitizeDescription(s string) string {
|
||||
s = strings.TrimSpace(s)
|
||||
if len(s) <= 1000 {
|
||||
return s
|
||||
}
|
||||
return s[:1000]
|
||||
}
|
||||
|
||||
func printSummary(rep model.TransferReport, reportPath string, elapsed time.Duration, dryRun bool) {
|
||||
mode := "TRANSFER"
|
||||
if dryRun {
|
||||
|
||||
Reference in New Issue
Block a user