build spotify-to-navidrome migrator with recovery flow
This commit is contained in:
206
internal/transfer/transfer.go
Normal file
206
internal/transfer/transfer.go
Normal file
@@ -0,0 +1,206 @@
|
||||
package transfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"navimigrate/internal/model"
|
||||
)
|
||||
|
||||
type Writer interface {
|
||||
CreatePlaylist(ctx context.Context, name string) (string, error)
|
||||
AddTracksToPlaylist(ctx context.Context, playlistID string, trackIDs []string) error
|
||||
}
|
||||
|
||||
type TrackMatcher interface {
|
||||
MatchTrack(ctx context.Context, src model.Track) model.MatchedTrack
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
DryRun bool
|
||||
Concurrency int
|
||||
Progress ProgressFunc
|
||||
}
|
||||
|
||||
type ProgressFunc func(message string)
|
||||
|
||||
func Run(ctx context.Context, cfg Config, writer Writer, matcher TrackMatcher, playlists []model.Playlist) (model.TransferReport, error) {
|
||||
rep := model.TransferReport{
|
||||
StartedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
DryRun: cfg.DryRun,
|
||||
}
|
||||
|
||||
totalPlaylists := len(playlists)
|
||||
for i, pl := range playlists {
|
||||
result := processPlaylist(ctx, cfg, writer, matcher, pl, i+1, totalPlaylists)
|
||||
rep.Results = append(rep.Results, result)
|
||||
notify(cfg, fmt.Sprintf(
|
||||
"Transfer %d/%d done: %s | matched %d/%d | added %d | unmatched %d",
|
||||
i+1,
|
||||
totalPlaylists,
|
||||
shortName(pl.Name),
|
||||
result.MatchedTracks,
|
||||
result.TotalTracks,
|
||||
result.AddedTracks,
|
||||
len(result.Unmatched),
|
||||
))
|
||||
}
|
||||
|
||||
rep.EndedAt = time.Now().UTC().Format(time.RFC3339)
|
||||
notify(cfg, "Transfer processing complete")
|
||||
return rep, nil
|
||||
}
|
||||
|
||||
func processPlaylist(ctx context.Context, cfg Config, writer Writer, matcher TrackMatcher, pl model.Playlist, playlistIndex, playlistTotal int) model.PlaylistTransferResult {
|
||||
res := model.PlaylistTransferResult{
|
||||
Name: pl.Name,
|
||||
TotalTracks: len(pl.Tracks),
|
||||
Errors: []string{},
|
||||
Unmatched: []model.MatchedTrack{},
|
||||
}
|
||||
|
||||
notify(cfg, fmt.Sprintf("Transfer %d/%d matching: %s (0/%d)", playlistIndex, playlistTotal, shortName(pl.Name), len(pl.Tracks)))
|
||||
|
||||
matched, unmatched := matchTracks(ctx, matcher, pl.Tracks, cfg.Concurrency, func(done, total int) {
|
||||
notify(cfg, fmt.Sprintf("Transfer %d/%d matching: %s (%d/%d)", playlistIndex, playlistTotal, shortName(pl.Name), done, total))
|
||||
})
|
||||
res.MatchedTracks = len(matched)
|
||||
res.Unmatched = unmatched
|
||||
|
||||
if cfg.DryRun {
|
||||
res.AddedTracks = len(uniqueIDs(matched))
|
||||
notify(cfg, fmt.Sprintf("Transfer %d/%d dry-run: %s resolved %d matches", playlistIndex, playlistTotal, shortName(pl.Name), res.AddedTracks))
|
||||
return res
|
||||
}
|
||||
|
||||
notify(cfg, fmt.Sprintf("Transfer %d/%d creating playlist: %s", playlistIndex, playlistTotal, shortName(pl.Name)))
|
||||
playlistID, err := writer.CreatePlaylist(ctx, pl.Name)
|
||||
if err != nil {
|
||||
res.Errors = append(res.Errors, fmt.Sprintf("create playlist: %v", err))
|
||||
notify(cfg, fmt.Sprintf("Transfer %d/%d failed creating playlist: %s", playlistIndex, playlistTotal, shortName(pl.Name)))
|
||||
return res
|
||||
}
|
||||
res.TargetID = playlistID
|
||||
|
||||
ids := uniqueIDs(matched)
|
||||
if len(ids) == 0 {
|
||||
notify(cfg, fmt.Sprintf("Transfer %d/%d no matched tracks to add: %s", playlistIndex, playlistTotal, shortName(pl.Name)))
|
||||
return res
|
||||
}
|
||||
|
||||
notify(cfg, fmt.Sprintf("Transfer %d/%d adding %d track(s): %s", playlistIndex, playlistTotal, len(ids), shortName(pl.Name)))
|
||||
if err := writer.AddTracksToPlaylist(ctx, playlistID, ids); err != nil {
|
||||
res.Errors = append(res.Errors, fmt.Sprintf("add tracks: %v", err))
|
||||
notify(cfg, fmt.Sprintf("Transfer %d/%d failed adding tracks: %s", playlistIndex, playlistTotal, shortName(pl.Name)))
|
||||
return res
|
||||
}
|
||||
|
||||
res.AddedTracks = len(ids)
|
||||
notify(cfg, fmt.Sprintf("Transfer %d/%d added %d track(s): %s", playlistIndex, playlistTotal, res.AddedTracks, shortName(pl.Name)))
|
||||
return res
|
||||
}
|
||||
|
||||
func matchTracks(ctx context.Context, matcher TrackMatcher, tracks []model.Track, concurrency int, progress func(done, total int)) ([]string, []model.MatchedTrack) {
|
||||
if concurrency < 1 {
|
||||
concurrency = 1
|
||||
}
|
||||
|
||||
type job struct {
|
||||
idx int
|
||||
trk model.Track
|
||||
}
|
||||
type out struct {
|
||||
idx int
|
||||
res model.MatchedTrack
|
||||
}
|
||||
|
||||
jobs := make(chan job)
|
||||
results := make(chan out)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < concurrency; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := range jobs {
|
||||
m := matcher.MatchTrack(ctx, j.trk)
|
||||
results <- out{idx: j.idx, res: m}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
for i, t := range tracks {
|
||||
jobs <- job{idx: i, trk: t}
|
||||
}
|
||||
close(jobs)
|
||||
wg.Wait()
|
||||
close(results)
|
||||
}()
|
||||
|
||||
ordered := make([]model.MatchedTrack, len(tracks))
|
||||
total := len(tracks)
|
||||
step := 1
|
||||
if total > 100 {
|
||||
step = total / 100
|
||||
}
|
||||
done := 0
|
||||
for r := range results {
|
||||
ordered[r.idx] = r.res
|
||||
done++
|
||||
if progress != nil {
|
||||
if done == 1 || done == total || done%step == 0 {
|
||||
progress(done, total)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
matched := make([]string, 0, len(tracks))
|
||||
unmatched := make([]model.MatchedTrack, 0)
|
||||
for _, r := range ordered {
|
||||
if r.Matched && strings.TrimSpace(r.TargetID) != "" {
|
||||
matched = append(matched, r.TargetID)
|
||||
} else {
|
||||
unmatched = append(unmatched, r)
|
||||
}
|
||||
}
|
||||
return matched, unmatched
|
||||
}
|
||||
|
||||
func uniqueIDs(ids []string) []string {
|
||||
seen := map[string]struct{}{}
|
||||
out := make([]string, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
id = strings.TrimSpace(id)
|
||||
if id == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[id]; ok {
|
||||
continue
|
||||
}
|
||||
seen[id] = struct{}{}
|
||||
out = append(out, id)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func notify(cfg Config, msg string) {
|
||||
if cfg.Progress != nil {
|
||||
cfg.Progress(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func shortName(s string) string {
|
||||
const limit = 48
|
||||
s = strings.TrimSpace(s)
|
||||
if len(s) <= limit {
|
||||
return s
|
||||
}
|
||||
if limit <= 3 {
|
||||
return s[:limit]
|
||||
}
|
||||
return s[:limit-3] + "..."
|
||||
}
|
||||
Reference in New Issue
Block a user