From c1135eb8a075971272cc3a28ab25eb2a7ae676fc Mon Sep 17 00:00:00 2001 From: Pirates IRC <98669745+PiratesIRC@users.noreply.github.com> Date: Mon, 9 Mar 2026 04:17:59 -0500 Subject: [PATCH] Compatibility for Dispatcharr 0.2x --- Stream-Mapparr/fuzzy_matcher.py | 24 +- Stream-Mapparr/plugin.json | 48 +- Stream-Mapparr/plugin.py | 1100 +++++-------------------------- 3 files changed, 218 insertions(+), 954 deletions(-) diff --git a/Stream-Mapparr/fuzzy_matcher.py b/Stream-Mapparr/fuzzy_matcher.py index 5ec8ab3..8fc97de 100644 --- a/Stream-Mapparr/fuzzy_matcher.py +++ b/Stream-Mapparr/fuzzy_matcher.py @@ -12,7 +12,7 @@ import unicodedata from glob import glob # Version: YY.DDD.HHMM (Julian date format: Year.DayOfYear.Time) -__version__ = "25.358.0200" +__version__ = "26.018.0100" # Setup logging LOGGER = logging.getLogger("plugins.fuzzy_matcher") @@ -346,17 +346,26 @@ class FuzzyMatcher: # Store original for logging original_name = name - # CRITICAL FIX: Normalize spacing around numbers FIRST, before any other processing + # CRITICAL FIX (v25.019.0100): Apply quality patterns FIRST, before space normalization + # This prevents space normalization from breaking quality tags like "4K" -> "4 K" + # which would then fail to match quality patterns looking for "4K" + # Bug: Streams with "4K" suffix were not matching because "4K" was split to "4 K" + # by the space normalization step, then quality patterns couldn't find "4K" at end + if ignore_quality: + for pattern in QUALITY_PATTERNS: + name = re.sub(pattern, '', name, flags=re.IGNORECASE) + + # Normalize spacing around numbers (AFTER quality patterns are removed) # This ensures "ITV1" and "ITV 1" are treated identically during matching # Pattern: Insert space before number if preceded by letter, and after number if followed by letter # Examples: "ITV1" -> "ITV 1", "BBC2" -> "BBC 2", "E4" -> "E 4" name = re.sub(r'([a-zA-Z])(\d)', r'\1 \2', name) # Letter followed by digit name = re.sub(r'(\d)([a-zA-Z])', r'\1 \2', name) # Digit followed by letter - - # CRITICAL FIX: Normalize hyphens to spaces for better token matching + + # Normalize hyphens to spaces for better token matching # This ensures "UK-ITV" becomes "UK ITV" and matches properly # Common patterns: "UK-ITV 1", "US-CNN", etc. - name = re.sub(r'-', ' ', name) # Digit followed by letter + name = re.sub(r'-', ' ', name) # Remove ALL leading parenthetical prefixes like (US) (PRIME2), (SP2), (D1), etc. # Loop until no more leading parentheses are found @@ -386,11 +395,10 @@ class FuzzyMatcher: name = re.sub(r'\bCinemax\b\s*', '', name, flags=re.IGNORECASE) # Build list of patterns to apply based on category flags + # NOTE: Quality patterns are now applied earlier (before space normalization) + # to prevent "4K" from being split to "4 K" before removal patterns_to_apply = [] - if ignore_quality: - patterns_to_apply.extend(QUALITY_PATTERNS) - if ignore_regional: patterns_to_apply.extend(REGIONAL_PATTERNS) diff --git a/Stream-Mapparr/plugin.json b/Stream-Mapparr/plugin.json index 90ad3ec..21800da 100644 --- a/Stream-Mapparr/plugin.json +++ b/Stream-Mapparr/plugin.json @@ -1,16 +1,44 @@ { "name": "Stream-Mapparr", - "key": "stream-mapparr", - "module": "stream_mapparr.plugin", - "class": "Plugin", + "version": "0.8.0a", "description": "Automatically add matching streams to channels based on name similarity and quality precedence. Supports unlimited stream matching, channel visibility management, and CSV export cleanup.", "author": "community", - "homepage": "https://github.com/PiratesIRC/Stream-Mapparr", + "help_url": "https://github.com/PiratesIRC/Stream-Mapparr", + "fields": [ + {"id": "overwrite_streams", "label": "Overwrite Existing Streams", "type": "boolean", "default": true}, + {"id": "fuzzy_match_threshold", "label": "Fuzzy Match Threshold", "type": "number", "default": 85}, + {"id": "profile_name", "label": "Profile Name", "type": "string", "default": ""}, + {"id": "selected_groups", "label": "Channel Groups", "type": "string", "default": ""}, + {"id": "selected_stream_groups", "label": "Stream Groups", "type": "string", "default": ""}, + {"id": "selected_m3us", "label": "M3U Sources", "type": "string", "default": ""}, + {"id": "prioritize_quality", "label": "Prioritize Quality Before Source", "type": "boolean", "default": false}, + {"id": "ignore_tags", "label": "Ignore Tags", "type": "string", "default": ""}, + {"id": "ignore_quality_tags", "label": "Ignore Quality Tags", "type": "boolean", "default": true}, + {"id": "ignore_regional_tags", "label": "Ignore Regional Tags", "type": "boolean", "default": true}, + {"id": "ignore_geographic_tags", "label": "Ignore Geographic Tags", "type": "boolean", "default": true}, + {"id": "ignore_misc_tags", "label": "Ignore Misc Tags", "type": "boolean", "default": true}, + {"id": "visible_channel_limit", "label": "Visible Channel Limit", "type": "number", "default": 1}, + {"id": "rate_limiting", "label": "Rate Limiting", "type": "select", "default": "none"}, + {"id": "timezone", "label": "Timezone", "type": "select", "default": "US/Central"}, + {"id": "filter_dead_streams", "label": "Filter Dead Streams", "type": "boolean", "default": false}, + {"id": "wait_for_iptv_checker", "label": "Wait for IPTV Checker", "type": "boolean", "default": false}, + {"id": "iptv_checker_max_wait_hours", "label": "IPTV Checker Max Wait", "type": "number", "default": 6}, + {"id": "dry_run_mode", "label": "Dry Run Mode", "type": "boolean", "default": false}, + {"id": "scheduled_times", "label": "Scheduled Run Times", "type": "string", "default": ""}, + {"id": "scheduled_sort_streams", "label": "Schedule Sort Streams", "type": "boolean", "default": false}, + {"id": "scheduled_match_streams", "label": "Schedule Match Streams", "type": "boolean", "default": true}, + {"id": "enable_scheduled_csv_export", "label": "Enable CSV Export", "type": "boolean", "default": true} + ], "actions": [ - "load_process_channels", - "preview_changes", - "add_streams_to_channels", - "manage_channel_visibility", - "clear_csv_exports" + {"id": "validate_settings", "label": "Validate Settings", "description": "Check database connectivity, profiles, groups, and channel databases"}, + {"id": "update_schedule", "label": "Update Schedule", "description": "Save settings and restart background scheduler"}, + {"id": "load_process_channels", "label": "Load/Process Channels", "description": "Load channel and stream data from database"}, + {"id": "preview_changes", "label": "Preview Changes", "description": "Generate CSV preview without making changes"}, + {"id": "add_streams_to_channels", "label": "Match & Assign Streams", "description": "Match and assign streams to channels"}, + {"id": "match_us_ota_only", "label": "Match US OTA Only", "description": "Match US OTA channels by callsign"}, + {"id": "sort_streams", "label": "Sort Alternate Streams", "description": "Sort existing alternate streams by quality"}, + {"id": "manage_channel_visibility", "label": "Manage Channel Visibility", "description": "Enable/disable channels based on stream count"}, + {"id": "clear_csv_exports", "label": "Clear CSV Exports", "description": "Delete all plugin CSV export files"}, + {"id": "clear_operation_lock", "label": "Clear Operation Lock", "description": "Manually clear stuck operation lock"} ] -} \ No newline at end of file +} diff --git a/Stream-Mapparr/plugin.py b/Stream-Mapparr/plugin.py index 0719f49..11b9a66 100644 --- a/Stream-Mapparr/plugin.py +++ b/Stream-Mapparr/plugin.py @@ -8,11 +8,9 @@ import json import csv import os import re -import requests import urllib.request import urllib.error import time -import random import pytz from datetime import datetime, timedelta from django.utils import timezone @@ -24,21 +22,16 @@ from .fuzzy_matcher import FuzzyMatcher # Import fuzzy_matcher version for CSV header from . import fuzzy_matcher -# Django model imports - same approach as Event Channel Managarr -from apps.channels.models import Channel, ChannelProfileMembership, ChannelStream, Stream +# Django model imports - direct ORM access (plugins run inside Django backend) +from apps.channels.models import Channel, ChannelGroup, ChannelProfile, ChannelProfileMembership, ChannelStream, Stream +from core.utils import send_websocket_update # Background scheduling globals _bg_thread = None _stop_event = threading.Event() -# Setup logging using Dispatcharr's format +# Setup logging - Dispatcharr provides a pre-configured logger via context LOGGER = logging.getLogger("plugins.stream_mapparr") -if not LOGGER.handlers: - handler = logging.StreamHandler() - formatter = logging.Formatter("%(levelname)s %(name)s %(message)s") - handler.setFormatter(formatter) - LOGGER.addHandler(handler) -LOGGER.setLevel(logging.INFO) # ============================================================================ # CONFIGURATION DEFAULTS - Modify these values to change plugin defaults @@ -57,7 +50,7 @@ class PluginConfig: - Matching Settings: Fuzzy matching behavior - Tag Filtering: Which tag types to ignore by default - Profile & Group Settings: Default profile/group selections - - API Settings: Timeouts, rate limiting, caching + - Rate Limiting: Operation pacing settings - Scheduling Settings: Timezone, schedule format, CSV export - Cache Settings: How long to cache data - File Paths: Where to store data files @@ -65,7 +58,7 @@ class PluginConfig: """ # === PLUGIN METADATA === - PLUGIN_VERSION = "0.7.3" + PLUGIN_VERSION = "0.8.0a" FUZZY_MATCHER_MIN_VERSION = "25.358.0200" # Requires custom ignore tags Unicode fix # === MATCHING SETTINGS === @@ -87,18 +80,12 @@ class PluginConfig: DEFAULT_SELECTED_M3US = "" # Empty = all M3U sources DEFAULT_PRIORITIZE_QUALITY = False # When true, sort quality before M3U source priority - # === API SETTINGS === - DEFAULT_DISPATCHARR_URL = "" # Required by user + # === RATE LIMITING DELAYS (seconds) - used for pacing ORM operations === DEFAULT_RATE_LIMITING = "none" # Options: none, low, medium, high - API_REQUEST_TIMEOUT = 30 # Seconds for API requests - API_TOKEN_CACHE_DURATION = 30 # Minutes to cache API token - - # === RATE LIMITING DELAYS (seconds) === RATE_LIMIT_NONE = 0.0 # No rate limiting - RATE_LIMIT_LOW = 0.1 # 10 requests/second - RATE_LIMIT_MEDIUM = 0.5 # 2 requests/second - RATE_LIMIT_HIGH = 2.0 # 1 request/2 seconds - RATE_LIMIT_MAX_BACKOFF = 60 # Maximum exponential backoff delay + RATE_LIMIT_LOW = 0.1 # 10 operations/second + RATE_LIMIT_MEDIUM = 0.5 # 2 operations/second + RATE_LIMIT_HIGH = 2.0 # 1 operation/2 seconds # === SCHEDULING SETTINGS === DEFAULT_TIMEZONE = "US/Central" # Default timezone for scheduled runs @@ -291,8 +278,7 @@ class ProgressTracker: class SmartRateLimiter: """ - Handles rate limiting with exponential backoff for 429/5xx errors. - Uses standard library only. + Handles rate limiting with configurable delays to pace ORM write operations. """ def __init__(self, setting_value="medium", logger=None): self.logger = logger @@ -308,33 +294,10 @@ class SmartRateLimiter: else: self.base_delay = PluginConfig.RATE_LIMIT_MEDIUM - self.current_delay = self.base_delay - self.consecutive_errors = 0 - def wait(self): - """Call this before making a request""" - if not self.disabled and self.current_delay > 0: - time.sleep(self.current_delay) - - def report_success(self): - """Call this after a successful 200 OK request""" - self.consecutive_errors = 0 - if self.current_delay > self.base_delay: - self.current_delay = max(self.base_delay, self.current_delay / 2) - - def report_error(self, status_code): - """Call this when an API request fails""" - self.consecutive_errors += 1 - - if status_code == 429 or status_code >= 500: - backoff = min(PluginConfig.RATE_LIMIT_MAX_BACKOFF, self.base_delay * (2 ** self.consecutive_errors)) - jitter = backoff * 0.1 * random.random() # +/- 10% jitter - self.current_delay = backoff + jitter - - if self.logger: - self.logger.warning(f"[Stream-Mapparr] Rate limit/Server error ({status_code}). Backing off to {self.current_delay:.2f}s") - else: - self.current_delay = self.base_delay + """Call this before an operation to pace execution.""" + if not self.disabled and self.base_delay > 0: + time.sleep(self.base_delay) class Plugin: """Dispatcharr Stream-Mapparr Plugin""" @@ -372,27 +335,6 @@ class Plugin: "default": PluginConfig.DEFAULT_FUZZY_MATCH_THRESHOLD, "help_text": f"Minimum similarity score (0-100) for fuzzy matching. Higher values require closer matches. Default: {PluginConfig.DEFAULT_FUZZY_MATCH_THRESHOLD}", }, - { - "id": "dispatcharr_url", - "label": "🌐 Dispatcharr URL", - "type": "string", - "default": PluginConfig.DEFAULT_DISPATCHARR_URL, - "placeholder": "http://192.168.1.10:9191", - "help_text": "URL of your Dispatcharr instance (from your browser address bar). Example: http://127.0.0.1:9191", - }, - { - "id": "dispatcharr_username", - "label": "šŸ‘¤ Dispatcharr Admin Username", - "type": "string", - "help_text": "Your admin username for the Dispatcharr UI. Required for API access.", - }, - { - "id": "dispatcharr_password", - "label": "šŸ”‘ Dispatcharr Admin Password", - "type": "string", - "input_type": "password", - "help_text": "Your admin password for the Dispatcharr UI. Required for API access.", - }, { "id": "profile_name", "label": "šŸ“‹ Profile Name", @@ -486,7 +428,7 @@ class Plugin: {"label": "High (Slow)", "value": "high"}, ], "default": PluginConfig.DEFAULT_RATE_LIMITING, - "help_text": "Controls delay between API calls. None=No delays, Low=Fast/Aggressive, Medium=Standard, High=Slow/Safe.", + "help_text": "Controls delay between operations. None=No delays, Low=Fast, Medium=Standard, High=Slow/Safe.", }, { "id": "timezone", @@ -699,8 +641,6 @@ class Plugin: self.loaded_streams = [] self.channel_stream_matches = [] self.fuzzy_matcher = None - self.api_token = None - self.token_expiration = None self.saved_settings = {} LOGGER.info(f"[Stream-Mapparr] {self.name} Plugin v{self.version} initialized") @@ -1100,321 +1040,55 @@ class Plugin: LOGGER.warning(f"[Stream-Mapparr] Failed to initialize FuzzyMatcher: {e}") self.fuzzy_matcher = None - def get_or_refresh_api_token(self, settings, logger): - """Get API token from cache or refresh if expired.""" - if self.api_token and self.token_expiration and self.token_expiration > datetime.now(): - logger.debug("[Stream-Mapparr] Using cached API token.") - return self.api_token, None + # ========================================================================= + # ORM HELPER METHODS - Direct database access (replaces HTTP API methods) + # ========================================================================= - logger.debug("[Stream-Mapparr] API token is expired or not found, getting a new one.") - token, error = self._get_api_token(settings, logger) - if token: - self.api_token = token - self.token_expiration = datetime.now() + timedelta(minutes=PluginConfig.API_TOKEN_CACHE_DURATION) - logger.debug(f"[Stream-Mapparr] API token cached for {PluginConfig.API_TOKEN_CACHE_DURATION} minutes.") + def _get_all_profiles(self, logger): + """Fetch all channel profiles via Django ORM.""" + return list(ChannelProfile.objects.all().values('id', 'name')) - return token, error - - def _get_api_token(self, settings, logger): - """Get an API access token using username and password.""" - dispatcharr_url = settings.get("dispatcharr_url") or "" - dispatcharr_url = dispatcharr_url.strip().rstrip('/') if dispatcharr_url else "" - username = settings.get("dispatcharr_username") or "" - password = settings.get("dispatcharr_password") or "" - - if not all([dispatcharr_url, username, password]): - return None, "Dispatcharr URL, Username, and Password must be configured in the plugin settings." + def _get_all_groups(self, logger): + """Fetch all channel groups via Django ORM.""" + return list(ChannelGroup.objects.all().values('id', 'name')) + def _get_all_channels(self, logger): + """Fetch all channels via Django ORM.""" + fields = ['id', 'name', 'channel_number', 'channel_group_id'] + # Include attached_channel_id if the model has it (used by visibility management) try: - url = f"{dispatcharr_url}/api/accounts/token/" - payload = {"username": username, "password": password} + Channel._meta.get_field('attached_channel') + fields.append('attached_channel_id') + except Exception: + pass + return list(Channel.objects.select_related('channel_group').all().values(*fields)) - logger.debug(f"[Stream-Mapparr] Attempting to authenticate with Dispatcharr at: {url}") - response = requests.post(url, json=payload, timeout=15) + def _get_all_streams(self, logger): + """Fetch all streams via Django ORM, returning dicts compatible with existing processing logic.""" + return list(Stream.objects.all().values( + 'id', 'name', 'm3u_account', 'channel_group', 'group_title' + )) - if response.status_code == 401: - logger.error("[Stream-Mapparr] Authentication failed - invalid credentials") - return None, "Authentication failed. Please check your username and password in the plugin settings." - elif response.status_code == 404: - logger.error(f"[Stream-Mapparr] API endpoint not found - check Dispatcharr URL: {dispatcharr_url}") - return None, f"API endpoint not found. Please verify your Dispatcharr URL: {dispatcharr_url}" - elif response.status_code >= 500: - logger.error(f"[Stream-Mapparr] Server error from Dispatcharr: {response.status_code}") - return None, f"Dispatcharr server error ({response.status_code}). Please check if Dispatcharr is running properly." + def _get_all_m3u_accounts(self, logger): + """Fetch all M3U accounts via Django ORM.""" + from apps.m3u.models import M3UAccount + return list(M3UAccount.objects.all().values('id', 'name')) - response.raise_for_status() - token_data = response.json() - access_token = token_data.get("access") - - if not access_token: - logger.error("[Stream-Mapparr] No access token returned from API") - return None, "Login successful, but no access token was returned by the API." - - logger.debug("[Stream-Mapparr] Successfully obtained API access token") - return access_token, None - - except requests.exceptions.ConnectionError as e: - logger.error(f"[Stream-Mapparr] Connection error: {e}") - return None, f"Unable to connect to Dispatcharr at {dispatcharr_url}. Please check the URL and ensure Dispatcharr is running." - except requests.exceptions.Timeout as e: - logger.error(f"[Stream-Mapparr] Request timeout: {e}") - return None, "Request timed out while connecting to Dispatcharr. Please check your network connection." - except requests.RequestException as e: - logger.error(f"[Stream-Mapparr] Request error: {e}") - return None, f"Network error occurred while authenticating: {e}" - except json.JSONDecodeError as e: - logger.error(f"[Stream-Mapparr] Invalid JSON response: {e}") - return None, "Invalid response from Dispatcharr API. Please check if the URL is correct." - except Exception as e: - logger.error(f"[Stream-Mapparr] Unexpected error during authentication: {e}") - return None, f"Unexpected error during authentication: {e}" - - def _get_api_data(self, endpoint, token, settings, logger, limiter=None): - """Helper to perform GET requests to the Dispatcharr API with rate limiting support.""" - dispatcharr_url = settings.get("dispatcharr_url") or "" - dispatcharr_url = dispatcharr_url.strip().rstrip('/') if dispatcharr_url else "" - url = f"{dispatcharr_url}{endpoint}" - headers = {'Authorization': f'Bearer {token}', 'Accept': 'application/json'} - - try: - if limiter: limiter.wait() - logger.debug(f"[Stream-Mapparr] Making API request to: {endpoint}") - response = requests.get(url, headers=headers, timeout=PluginConfig.API_REQUEST_TIMEOUT) - - # --- Smart Rate Limiter Logic --- - if limiter: - if response.ok: - limiter.report_success() - else: - limiter.report_error(response.status_code) - # -------------------------------- - - if response.status_code == 401: - logger.error("[Stream-Mapparr] API token expired or invalid, attempting to refresh.") - self.api_token = None # Invalidate token - new_token, error = self.get_or_refresh_api_token(settings, logger) - if error: - raise Exception("Failed to refresh API token.") - - # Retry request with new token - headers['Authorization'] = f'Bearer {new_token}' - if limiter: limiter.wait() # Wait before retry - response = requests.get(url, headers=headers, timeout=PluginConfig.API_REQUEST_TIMEOUT) - - if limiter: - if response.ok: limiter.report_success() - else: limiter.report_error(response.status_code) - - if response.status_code == 403: - logger.error("[Stream-Mapparr] API access forbidden") - raise Exception("API access forbidden. Check user permissions.") - elif response.status_code == 404: - logger.error(f"[Stream-Mapparr] API endpoint not found: {endpoint}") - raise Exception(f"API endpoint not found: {endpoint}") - - response.raise_for_status() - - # Check if response has content before trying to parse JSON - if not response.text or response.text.strip() == '': - logger.warning(f"[Stream-Mapparr] API returned empty response for {endpoint}") - return [] - - try: - json_data = response.json() - except json.JSONDecodeError as e: - logger.error(f"[Stream-Mapparr] Invalid JSON response for {endpoint}: {e}") - logger.debug(f"[Stream-Mapparr] Response content: {response.text[:200]}") - raise Exception(f"API returned invalid JSON: {e}") - - if isinstance(json_data, dict): - return json_data.get('results', json_data) - elif isinstance(json_data, list): - return json_data - return [] - - except requests.exceptions.RequestException as e: - logger.error(f"[Stream-Mapparr] API request failed for {endpoint}: {e}") - raise Exception(f"API request failed: {e}") - - def _patch_api_data(self, endpoint, token, payload, settings, logger, limiter=None): - """Helper to perform PATCH requests to the Dispatcharr API with rate limiting support.""" - dispatcharr_url = settings.get("dispatcharr_url") or "" - dispatcharr_url = dispatcharr_url.strip().rstrip('/') if dispatcharr_url else "" - url = f"{dispatcharr_url}{endpoint}" - headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'} - - try: - if limiter: limiter.wait() - logger.debug(f"[Stream-Mapparr] Making API PATCH request to: {endpoint}") - response = requests.patch(url, headers=headers, json=payload, timeout=60) - - # --- Smart Rate Limiter Logic --- - if limiter: - if response.ok: - limiter.report_success() - else: - limiter.report_error(response.status_code) - # -------------------------------- - - if response.status_code == 401: - logger.error("[Stream-Mapparr] API token expired or invalid, attempting to refresh.") - self.api_token = None # Invalidate token - new_token, error = self.get_or_refresh_api_token(settings, logger) - if error: - raise Exception("Failed to refresh API token.") - - # Retry request with new token - headers['Authorization'] = f'Bearer {new_token}' - if limiter: limiter.wait() - response = requests.patch(url, headers=headers, json=payload, timeout=60) - if limiter: - if response.ok: limiter.report_success() - else: limiter.report_error(response.status_code) - - if response.status_code == 403: - logger.error("[Stream-Mapparr] API access forbidden") - raise Exception("API access forbidden. Check user permissions.") - elif response.status_code == 404: - logger.error(f"[Stream-Mapparr] API endpoint not found: {endpoint}") - raise Exception(f"API endpoint not found: {endpoint}") - - response.raise_for_status() - - # Check if response has content before trying to parse JSON - if not response.text or response.text.strip() == '': - logger.warning(f"[Stream-Mapparr] API returned empty response for {endpoint}") - return {} - - try: - return response.json() - except json.JSONDecodeError as e: - logger.error(f"[Stream-Mapparr] Invalid JSON response for {endpoint}: {e}") - logger.debug(f"[Stream-Mapparr] Response content: {response.text[:200]}") - raise Exception(f"API returned invalid JSON: {e}") - - except requests.exceptions.RequestException as e: - logger.error(f"[Stream-Mapparr] API PATCH request failed for {endpoint}: {e}") - raise Exception(f"API PATCH request failed: {e}") - - def _post_api_data(self, endpoint, token, payload, settings, logger): - """Helper to perform POST requests to the Dispatcharr API.""" - dispatcharr_url = settings.get("dispatcharr_url") or "" - dispatcharr_url = dispatcharr_url.strip().rstrip('/') if dispatcharr_url else "" - url = f"{dispatcharr_url}{endpoint}" - headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'} - - try: - logger.debug(f"[Stream-Mapparr] Making API POST request to: {endpoint}") - response = requests.post(url, headers=headers, json=payload, timeout=PluginConfig.API_REQUEST_TIMEOUT) - - if response.status_code == 401: - logger.error("[Stream-Mapparr] API token expired or invalid, attempting to refresh.") - self.api_token = None # Invalidate token - new_token, error = self.get_or_refresh_api_token(settings, logger) - if error: - raise Exception("Failed to refresh API token.") - - # Retry request with new token - headers['Authorization'] = f'Bearer {new_token}' - response = requests.post(url, headers=headers, json=payload, timeout=PluginConfig.API_REQUEST_TIMEOUT) - - if response.status_code == 403: - logger.error("[Stream-Mapparr] API access forbidden") - raise Exception("API access forbidden. Check user permissions.") - elif response.status_code == 404: - logger.error(f"[Stream-Mapparr] API endpoint not found: {endpoint}") - raise Exception(f"API endpoint not found: {endpoint}") - - response.raise_for_status() - - # Check if response has content before trying to parse JSON - if not response.text or response.text.strip() == '': - logger.warning(f"[Stream-Mapparr] API returned empty response for {endpoint}") - return {} - - try: - return response.json() - except json.JSONDecodeError as e: - logger.error(f"[Stream-Mapparr] Invalid JSON response for {endpoint}: {e}") - logger.debug(f"[Stream-Mapparr] Response content: {response.text[:200]}") - raise Exception(f"API returned invalid JSON: {e}") - - except requests.exceptions.RequestException as e: - logger.error(f"[Stream-Mapparr] API POST request failed for {endpoint}: {e}") - raise Exception(f"API POST request failed: {e}") - - def _delete_api_data(self, endpoint, token, settings, logger, limiter=None): - """DELETE request to Dispatcharr API with automatic token refresh. - - Args: - endpoint: API endpoint (e.g., "/api/channels/channel-streams/123/") - token: Bearer token for authentication - settings: Plugin settings dict - logger: Logger instance - limiter: Optional SmartRateLimiter instance - - Returns: - bool: True if successful, False otherwise - """ - dispatcharr_url = settings.get("dispatcharr_url", PluginConfig.DEFAULT_DISPATCHARR_URL) - if not dispatcharr_url: - logger.error("[Stream-Mapparr] Dispatcharr URL not configured") - return False - - url = f"{dispatcharr_url.rstrip('/')}{endpoint}" - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "application/json" - } - - try: - if limiter: - limiter.wait() - - response = requests.delete(url, headers=headers, timeout=PluginConfig.API_REQUEST_TIMEOUT) - - # Handle 401 (token expired) - refresh and retry once - if response.status_code == 401: - logger.info("[Stream-Mapparr] Token expired during DELETE, refreshing...") - new_token = self._get_api_token(settings, logger) - if new_token: - headers["Authorization"] = f"Bearer {new_token}" - response = requests.delete(url, headers=headers, timeout=PluginConfig.API_REQUEST_TIMEOUT) - - if limiter: - if response.status_code == 200 or response.status_code == 204: - limiter.report_success() - else: - limiter.report_error(response.status_code) - - if response.status_code in [200, 204]: - return True - else: - logger.error(f"[Stream-Mapparr] DELETE {endpoint} failed: {response.status_code}") - return False - - except Exception as e: - logger.error(f"[Stream-Mapparr] Error in DELETE {endpoint}: {str(e)}") - return False + def _get_stream_groups(self, logger): + """Fetch distinct stream group titles via Django ORM.""" + group_titles = Stream.objects.values_list('group_title', flat=True).distinct() + return [g for g in group_titles if g] def _trigger_frontend_refresh(self, settings, logger): """Trigger frontend channel list refresh via WebSocket""" try: - from channels.layers import get_channel_layer - from asgiref.sync import async_to_sync - - channel_layer = get_channel_layer() - if channel_layer: - # Send WebSocket message to trigger frontend refresh - async_to_sync(channel_layer.group_send)( - "dispatcharr_updates", - { - "type": "channels.updated", - "message": "Channel visibility updated by Event Channel Managarr" - } - ) - logger.debug("[Stream-Mapparr] Frontend refresh triggered via WebSocket") - return True + send_websocket_update('updates', 'update', { + "type": "plugin", + "plugin": self.name, + "message": "Channels updated" + }) + logger.info("[Stream-Mapparr] Frontend refresh triggered via WebSocket") + return True except Exception as e: logger.warning(f"[Stream-Mapparr] Could not trigger frontend refresh: {e}") return False @@ -2455,42 +2129,13 @@ class Plugin: return None def save_settings(self, settings, context): - """Save settings and sync schedules automatically""" + """Save settings. Schedule changes are applied via the Update Schedule action.""" try: LOGGER.debug(f"[Stream-Mapparr] Saving settings with keys: {list(settings.keys())}") - - # Get timezone and schedule settings - user_timezone = settings.get("timezone") or PluginConfig.DEFAULT_TIMEZONE - enabled = settings.get("schedule_enabled", False) - if isinstance(enabled, str): - enabled = enabled.lower() in ('true', '1', 'yes', 'on') - - cron_schedule = settings.get("schedule_cron") or "" - cron_schedule = cron_schedule.strip() if cron_schedule else "" - - LOGGER.debug(f"[Stream-Mapparr] Schedule settings: enabled={enabled}, cron='{cron_schedule}', tz={user_timezone}") - - # Sync the schedule - if enabled and cron_schedule: - if not self._validate_cron(cron_schedule): - return { - "success": False, - "message": f"Invalid cron expression: {cron_schedule}" - } - self._create_or_update_schedule(cron_schedule, user_timezone, settings) - message = f"āœ… Settings saved!\nšŸ“… Schedule activated: {cron_schedule} ({user_timezone})" - else: - self._delete_schedule() - if enabled: - message = "āœ… Settings saved!\nāš ļø Schedule enabled but no cron expression configured" - else: - message = "āœ… Settings saved!\nā„¹ļø Scheduled runs disabled" - return { "success": True, - "message": message + "message": "Settings saved! Use 'Update Schedule' to apply schedule changes." } - except Exception as e: LOGGER.error(f"[Stream-Mapparr] Error saving settings: {e}", exc_info=True) return { @@ -2659,18 +2304,23 @@ class Plugin: """Helper method to validate plugin settings.""" validation_results = [] has_errors = False - token = None try: - # 1. Validate API connection and obtain token - logger.debug("[Stream-Mapparr] Validating API connection...") - token, error = self.get_or_refresh_api_token(settings, logger) - if error: - validation_results.append(f"āŒ API Connection: {error}") + # 1. Test database connectivity directly + logger.debug("[Stream-Mapparr] Validating database connectivity...") + try: + channel_count = Channel.objects.count() + group_count = ChannelGroup.objects.count() + stream_count = Stream.objects.count() + profile_count = ChannelProfile.objects.count() + validation_results.append( + f"āœ… DB OK ({channel_count} channels, {group_count} groups, " + f"{stream_count} streams, {profile_count} profiles)" + ) + except Exception as e: + validation_results.append(f"āŒ DB error: {str(e)[:50]}") has_errors = True - return has_errors, validation_results, token - else: - validation_results.append("āœ… API Connection") + return has_errors, validation_results # 2. Validate profile name exists logger.debug("[Stream-Mapparr] Validating profile names...") @@ -2681,7 +2331,7 @@ class Plugin: has_errors = True else: profile_names = [name.strip() for name in profile_names_str.split(',') if name.strip()] - profiles = self._get_api_data("/api/channels/profiles/", token, settings, logger) + profiles = self._get_all_profiles(logger) available_profile_names = [p.get('name') for p in profiles if 'name' in p] missing_profiles = [] @@ -2706,46 +2356,12 @@ class Plugin: logger.debug("[Stream-Mapparr] Validating channel groups...") selected_groups_str = settings.get("selected_groups") or "" selected_groups_str = selected_groups_str.strip() if selected_groups_str else "" - - if selected_groups_str: - # Groups are specified, validate them - selected_groups = [g.strip() for g in selected_groups_str.split(',') if g.strip()] - - # Fetch all groups from API - all_groups = [] - page = 1 - while True: - try: - api_groups = self._get_api_data(f"/api/channels/groups/?page={page}", token, settings, logger) - except Exception as e: - # If we get an error (e.g., 404 for non-existent page), we've reached the end - if page > 1: - logger.debug(f"[Stream-Mapparr] No more group pages available (attempted page {page})") - break - else: - # If error on first page, re-raise - raise - if isinstance(api_groups, dict) and 'results' in api_groups: - results = api_groups['results'] - if not results: - logger.debug("[Stream-Mapparr] Reached last page of groups (empty results)") - break - all_groups.extend(results) - if not api_groups.get('next'): - break - page += 1 - elif isinstance(api_groups, list): - if not api_groups: - logger.debug("[Stream-Mapparr] Reached last page of groups (empty results)") - break - all_groups.extend(api_groups) - break - else: - break - + if selected_groups_str: + selected_groups = [g.strip() for g in selected_groups_str.split(',') if g.strip()] + all_groups = self._get_all_groups(logger) available_group_names = [g.get('name') for g in all_groups if 'name' in g] - + missing_groups = [] found_groups = [] for group_name in selected_groups: @@ -2753,14 +2369,13 @@ class Plugin: found_groups.append(group_name) else: missing_groups.append(group_name) - + if missing_groups: validation_results.append(f"āŒ Channel Groups: '{', '.join(missing_groups)}' not found") has_errors = True else: validation_results.append(f"āœ… Channel Groups ({len(found_groups)})") else: - # No groups specified is valid (means all groups) validation_results.append("āœ… Channel Groups (all)") # 4. Validate timezone is not empty @@ -2807,7 +2422,7 @@ class Plugin: else: validation_results.append(f"āœ… Channel Databases ({len(enabled_databases)})") - return has_errors, validation_results, token + return has_errors, validation_results except Exception as e: logger.error(f"[Stream-Mapparr] Error validating settings: {str(e)}") @@ -2815,11 +2430,11 @@ class Plugin: logger.error(f"Traceback: {traceback.format_exc()}") validation_results.append(f"āŒ Validation error: {str(e)}") has_errors = True - return has_errors, validation_results, token + return has_errors, validation_results def validate_settings_action(self, settings, logger): """Validate all plugin settings including profiles, groups, and API connection.""" - has_errors, validation_results, token = self._validate_plugin_settings(settings, logger) + has_errors, validation_results = self._validate_plugin_settings(settings, logger) if has_errors: # Separate errors from successes @@ -2830,163 +2445,12 @@ class Plugin: message = "āœ… All settings validated successfully!\n\n" + "\n".join(validation_results) return {"status": "success", "message": message} - def sync_schedules_action(self, settings, logger): - """Sync schedules from settings""" - try: - user_timezone = settings.get("timezone") or PluginConfig.DEFAULT_TIMEZONE - enabled = settings.get("schedule_enabled", False) - if isinstance(enabled, str): - enabled = enabled.lower() in ('true', '1', 'yes', 'on') - - cron_schedule = settings.get("schedule_cron") or "" - cron_schedule = cron_schedule.strip() if cron_schedule else "" - - logger.debug(f"[Stream-Mapparr] Syncing schedule: enabled={enabled}, schedule='{cron_schedule}', tz={user_timezone}") - - if enabled and cron_schedule: - if not self._validate_cron(cron_schedule): - return { - "status": "error", - "message": f"Invalid cron expression: {cron_schedule}" - } - self._create_or_update_schedule(cron_schedule, user_timezone, settings) - return { - "status": "success", - "message": f"āœ… Schedule synced! Cron: {cron_schedule} ({user_timezone})" - } - else: - self._delete_schedule() - if not enabled: - return { - "status": "success", - "message": "ā„¹ļø Schedule disabled and removed" - } - else: - return { - "status": "success", - "message": "ā„¹ļø No cron expression configured" - } - - except Exception as e: - logger.error(f"[Stream-Mapparr] Error syncing schedule: {e}", exc_info=True) - return {"status": "error", "message": f"Error: {str(e)}"} - - def view_schedules_action(self, settings, logger): - """View active schedule""" - try: - user_timezone = settings.get("timezone", PluginConfig.DEFAULT_TIMEZONE) - logger.debug(f"[Stream-Mapparr] Viewing schedules with timezone: {user_timezone}") - - task_name = "stream_mapparr_scheduled_task" - task = PeriodicTask.objects.filter(name=task_name, enabled=True).first() - - if task and task.crontab: - cron = task.crontab - cron_expr = f"{cron.minute} {cron.hour} {cron.day_of_month} {cron.month_of_year} {cron.day_of_week}" - - # Try to convert back to user's timezone for display - local_time = self._convert_utc_to_local(cron.minute, cron.hour, user_timezone) - - if local_time: - message = f"šŸ“… Stream-Mapparr Schedule:\n • Cron: {cron_expr} UTC ({local_time} {user_timezone})\n • Status: Active" - else: - message = f"šŸ“… Stream-Mapparr Schedule:\n • Cron: {cron_expr} UTC\n • Status: Active" - else: - message = "ā„¹ļø No active schedule found" - - return {"status": "success", "message": message} - - except Exception as e: - logger.error(f"[Stream-Mapparr] Error viewing schedules: {e}", exc_info=True) - return {"status": "error", "message": f"Error: {str(e)}"} - - def cleanup_schedule_action(self, settings, logger): - """Remove schedule created by this plugin""" - try: - task_name = "stream_mapparr_scheduled_task" - deleted_count = PeriodicTask.objects.filter(name=task_name).delete()[0] - - if deleted_count > 0: - message = f"āœ… Removed Stream-Mapparr schedule" - else: - message = "ā„¹ļø No schedule found to remove" - - return {"status": "success", "message": message} - - except Exception as e: - logger.error(f"[Stream-Mapparr] Error cleaning up schedule: {e}", exc_info=True) - return {"status": "error", "message": f"Error: {str(e)}"} - - def test_celery_task_action(self, settings, logger): - """Test if Celery task can be called and check registration""" - try: - messages = [] - - # Try to import the task - try: - from . import tasks - messages.append("āœ… Task module imported successfully") - except Exception as e: - messages.append(f"āŒ Failed to import task module: {e}") - return {"status": "error", "message": "\n".join(messages)} - - # Try to call the task directly (non-async) - try: - result = tasks.run_scheduled_stream_mapping() - messages.append("āœ… Task executed directly (non-Celery)") - messages.append(f" Result: {result.get('status', 'unknown')}") - except Exception as e: - messages.append(f"āš ļø Direct execution failed: {e}") - - # Check if Celery can see the task - try: - from celery import current_app - registered_tasks = list(current_app.tasks.keys()) - task_name = 'stream_mapparr.run_scheduled_stream_mapping' - - if task_name in registered_tasks: - messages.append(f"āœ… Task registered in Celery: {task_name}") - else: - messages.append(f"āŒ Task NOT found in Celery registry") - messages.append(f" Looking for: {task_name}") - # Show similar tasks - similar = [t for t in registered_tasks if 'stream' in t.lower() or 'mapparr' in t.lower()] - if similar: - messages.append(f" Similar tasks: {similar}") - except Exception as e: - messages.append(f"āš ļø Could not check Celery registry: {e}") - - # Check periodic task in database - try: - task_name = "stream_mapparr_scheduled_task" - task = PeriodicTask.objects.filter(name=task_name).first() - if task: - messages.append(f"āœ… Periodic task found in database:") - messages.append(f" Task: {task.task}") - messages.append(f" Enabled: {task.enabled}") - if task.crontab: - cron = task.crontab - messages.append(f" Schedule: {cron.minute} {cron.hour} {cron.day_of_month} {cron.month_of_year} {cron.day_of_week}") - else: - messages.append(f"ā„¹ļø No periodic task found in database") - except Exception as e: - messages.append(f"āš ļø Error checking database: {e}") - - return {"status": "success", "message": "\n".join(messages)} - - except Exception as e: - logger.error(f"[Stream-Mapparr] Error testing task: {e}", exc_info=True) - return {"status": "error", "message": f"Error: {str(e)}"} - def load_process_channels_action(self, settings, logger, context=None): """Load and process channels from specified profile and groups.""" try: - # Create the rate limiter instance once - limiter = SmartRateLimiter(settings.get("rate_limiting", "none"), logger) - self._send_progress_update("load_process_channels", 'running', 5, 'Validating settings...', context) logger.debug("[Stream-Mapparr] Validating settings before loading channels...") - has_errors, validation_results, token = self._validate_plugin_settings(settings, logger) + has_errors, validation_results = self._validate_plugin_settings(settings, logger) if has_errors: return {"status": "error", "message": "Cannot load channels - validation failed."} @@ -3027,9 +2491,9 @@ class Plugin: profile_names = [name.strip() for name in profile_names_str.split(',') if name.strip()] ignore_tags = self._parse_tags(ignore_tags_str) if ignore_tags_str else [] - # Fetch profiles with rate limiting + # Fetch profiles via ORM self._send_progress_update("load_process_channels", 'running', 20, 'Fetching profiles...', context) - profiles = self._get_api_data("/api/channels/profiles/", token, settings, logger, limiter=limiter) + profiles = self._get_all_profiles(logger) target_profiles = [] profile_ids = [] @@ -3046,86 +2510,33 @@ class Plugin: profile_id = profile_ids[0] - # Fetch groups with rate limiting + # Fetch groups via ORM self._send_progress_update("load_process_channels", 'running', 30, 'Fetching channel groups...', context) - all_groups = [] - page = 1 - while True: - try: - api_groups = self._get_api_data(f"/api/channels/groups/?page={page}", token, settings, logger, limiter=limiter) - except Exception as e: - # If we get an error (e.g., 404 for non-existent page), we've reached the end - if page > 1: - logger.debug(f"[Stream-Mapparr] No more group pages available (attempted page {page})") - break - else: - # If error on first page, re-raise - raise - - if isinstance(api_groups, dict) and 'results' in api_groups: - results = api_groups['results'] - if not results: - logger.debug("[Stream-Mapparr] Reached last page of groups (empty results)") - break - all_groups.extend(results) - if not api_groups.get('next'): - break - page += 1 - elif isinstance(api_groups, list): - if not api_groups: - logger.debug("[Stream-Mapparr] Reached last page of groups (empty results)") - break - all_groups.extend(api_groups) - break - else: - break - + all_groups = self._get_all_groups(logger) group_name_to_id = {g['name']: g['id'] for g in all_groups if 'name' in g and 'id' in g} - # Fetch stream groups with rate limiting (returns array of group name strings) + # Fetch stream groups via ORM self._send_progress_update("load_process_channels", 'running', 35, 'Fetching stream groups...', context) - all_stream_groups = [] - try: - api_stream_groups = self._get_api_data("/api/channels/streams/groups/", token, settings, logger, limiter=limiter) - if isinstance(api_stream_groups, list): - all_stream_groups = api_stream_groups - logger.info(f"[Stream-Mapparr] Found {len(all_stream_groups)} stream groups") - else: - logger.warning(f"[Stream-Mapparr] Unexpected stream groups response format: {type(api_stream_groups)}") - except Exception as e: - # Stream groups might not be available in this API version - logger.warning(f"[Stream-Mapparr] Could not fetch stream groups (API may not support this endpoint): {e}") + all_stream_groups = self._get_stream_groups(logger) + logger.info(f"[Stream-Mapparr] Found {len(all_stream_groups)} stream groups") - # Fetch M3U sources with rate limiting (returns array of M3U account objects) + # Fetch M3U sources via ORM self._send_progress_update("load_process_channels", 'running', 37, 'Fetching M3U sources...', context) - all_m3us = [] - try: - api_m3us = self._get_api_data("/api/m3u/accounts/", token, settings, logger, limiter=limiter) - if isinstance(api_m3us, list): - all_m3us = api_m3us - logger.info(f"[Stream-Mapparr] Found {len(all_m3us)} M3U sources") - else: - logger.warning(f"[Stream-Mapparr] Unexpected M3U sources response format: {type(api_m3us)}") - except Exception as e: - # M3U sources might not be available in this API version - logger.warning(f"[Stream-Mapparr] Could not fetch M3U sources (API may not support this endpoint): {e}") + all_m3us = self._get_all_m3u_accounts(logger) + logger.info(f"[Stream-Mapparr] Found {len(all_m3us)} M3U sources") m3u_name_to_id = {m['name']: m['id'] for m in all_m3us if 'name' in m and 'id' in m} - # Fetch channels with rate limiting + # Fetch channels via ORM self._send_progress_update("load_process_channels", 'running', 40, 'Fetching channels...', context) - all_channels = self._get_api_data("/api/channels/channels/", token, settings, logger, limiter=limiter) + all_channels = self._get_all_channels(logger) - channels_in_profile = [] - for channel in all_channels: - channel_id = channel['id'] - is_in_profile = ChannelProfileMembership.objects.filter( - channel_id=channel_id, - channel_profile_id__in=profile_ids, - enabled=True - ).exists() - if is_in_profile: - channels_in_profile.append(channel) + # Filter channels by profile membership (single bulk query) + profile_channel_ids = set(ChannelProfileMembership.objects.filter( + channel_profile_id__in=profile_ids, + enabled=True + ).values_list('channel_id', flat=True)) + channels_in_profile = [ch for ch in all_channels if ch['id'] in profile_channel_ids] if selected_groups_str: selected_groups = [g.strip() for g in selected_groups_str.split(',') if g.strip()] @@ -3145,62 +2556,10 @@ class Plugin: channels_to_process = channels_in_profile - # Fetch streams with rate limiting - self._send_progress_update("load_process_channels", 'running', 60, 'Fetching streams (this may take a while)...', context) - logger.info("[Stream-Mapparr] Fetching all streams from all groups (unlimited)...") - all_streams_data = [] - page = 1 - while True: - endpoint = f"/api/channels/streams/?page={page}&page_size=100" - - try: - streams_response = self._get_api_data(endpoint, token, settings, logger, limiter=limiter) - except Exception as e: - # If we get an error (e.g., 404 for non-existent page), we've reached the end - if page > 1: - logger.debug(f"[Stream-Mapparr] No more pages available (attempted page {page})") - break - else: - # If error on first page, re-raise - raise - - # Handle both paginated and non-paginated responses - if isinstance(streams_response, dict) and 'results' in streams_response: - results = streams_response['results'] - - # Check if we got empty results - if not results: - logger.debug("[Stream-Mapparr] Reached last page of streams (empty results)") - break - - all_streams_data.extend(results) - logger.debug(f"[Stream-Mapparr] Fetched page {page}: {len(results)} streams (total so far: {len(all_streams_data)})") - - # Stop if this page had fewer results than page_size (last page) - if len(results) < 100: - logger.debug("[Stream-Mapparr] Reached last page of streams") - break - - page += 1 - elif isinstance(streams_response, list): - # Check if we got empty results - if not streams_response: - logger.debug("[Stream-Mapparr] Reached last page of streams (empty results)") - break - - # List response - could still be paginated - all_streams_data.extend(streams_response) - logger.debug(f"[Stream-Mapparr] Fetched page {page}: {len(streams_response)} streams (total so far: {len(all_streams_data)})") - - # If we got exactly 100 results, there might be more pages - if len(streams_response) == 100: - page += 1 - else: - logger.debug("[Stream-Mapparr] Reached last page of streams") - break - else: - logger.warning("[Stream-Mapparr] Unexpected streams response format") - break + # Fetch all streams via ORM + self._send_progress_update("load_process_channels", 'running', 60, 'Fetching streams...', context) + logger.info("[Stream-Mapparr] Fetching all streams via ORM...") + all_streams_data = self._get_all_streams(logger) # Filter streams by selected stream groups (uses channel_group field) if selected_stream_groups_str: @@ -3568,10 +2927,9 @@ class Plugin: try: self._send_progress_update("preview_changes", 'running', 5, 'Initializing preview...', context) - limiter = SmartRateLimiter(settings.get("rate_limiting", "none"), logger) self._send_progress_update("preview_changes", 'running', 10, 'Validating settings...', context) - has_errors, validation_results, token = self._validate_plugin_settings(settings, logger) + has_errors, validation_results = self._validate_plugin_settings(settings, logger) if has_errors: return {"status": "error", "message": "Validation failed."} channels_data = self._load_channels_data(logger, settings) @@ -3628,13 +2986,11 @@ class Plugin: group_stats = {} # Track stats for each group for group_key, group_channels in channel_groups.items(): - limiter.wait() - # Update progress tracker (automatically sends updates every minute) progress_tracker.update(items_processed=1) - + sorted_channels = self._sort_channels_by_priority(group_channels) - + # Get matches at current threshold for the primary channel matched_streams, cleaned_channel_name, cleaned_stream_names, match_reason, database_used = self._match_streams_to_channel( sorted_channels[0], streams, logger, ignore_tags, ignore_quality, ignore_regional, ignore_geographic, ignore_misc, channels_data, filter_dead @@ -3779,10 +3135,6 @@ class Plugin: self._send_progress_update("add_streams_to_channels", 'running', 5, 'Initializing stream assignment...', context) limiter = SmartRateLimiter(settings.get("rate_limiting", "none"), logger) - self._send_progress_update("add_streams_to_channels", 'running', 10, 'Authenticating...', context) - token, error = self._get_api_token(settings, logger) - if error: return {"status": "error", "message": error} - channels_data = self._load_channels_data(logger, settings) with open(self.processed_data_file, 'r') as f: processed_data = json.load(f) @@ -4060,29 +3412,19 @@ class Plugin: if validation_result.get('status') != 'success': return validation_result - # Get API token - logger.info("[Stream-Mapparr] Authenticating with Dispatcharr API...") - token, error = self.get_or_refresh_api_token(settings, logger) - if error or not token: - return {"status": "error", "message": f"Failed to authenticate with Dispatcharr API: {error}"} - - # Get rate limiter - rate_limiting = settings.get("rate_limiting", PluginConfig.DEFAULT_RATE_LIMITING) - limiter = SmartRateLimiter(rate_limiting, logger) - - # Load channels (same logic as load_process_channels_action for profile/group filtering) - logger.info("[Stream-Mapparr] Loading channels from Dispatcharr...") - + # Load channels via ORM + logger.info("[Stream-Mapparr] Loading channels via ORM...") + profile_names_str = settings.get("profile_name") or "" profile_names_str = profile_names_str.strip() if profile_names_str else "" selected_groups_str = settings.get("selected_groups") or "" selected_groups_str = selected_groups_str.strip() if selected_groups_str else "" - + profile_names = [name.strip() for name in profile_names_str.split(',') if name.strip()] - - # Fetch profiles - profiles = self._get_api_data("/api/channels/profiles/", token, settings, logger, limiter=limiter) - + + # Fetch profiles via ORM + profiles = self._get_all_profiles(logger) + target_profiles = [] profile_ids = [] for profile_name in profile_names: @@ -4095,130 +3437,48 @@ class Plugin: return {"status": "error", "message": f"Profile '{profile_name}' not found."} target_profiles.append(found_profile) profile_ids.append(found_profile['id']) - - # Fetch groups if group filtering is specified + + # Fetch groups via ORM if group filtering is specified + group_name_to_id = {} if selected_groups_str: - all_groups = [] - page = 1 - while True: - try: - api_groups = self._get_api_data(f"/api/channels/groups/?page={page}", token, settings, logger, limiter=limiter) - except Exception as e: - if page > 1: - break - else: - raise - - if isinstance(api_groups, dict) and 'results' in api_groups: - results = api_groups['results'] - if not results: - break - all_groups.extend(results) - if not api_groups.get('next'): - break - page += 1 - elif isinstance(api_groups, list): - if not api_groups: - break - all_groups.extend(api_groups) - break - else: - break - + all_groups = self._get_all_groups(logger) group_name_to_id = {g['name']: g['id'] for g in all_groups if 'name' in g and 'id' in g} - - # Fetch all channels - all_channels = self._get_api_data("/api/channels/channels/", token, settings, logger, limiter=limiter) - - # Filter channels by profile (enabled=True in profile) - channels_in_profile = [] - for channel in all_channels: - channel_id = channel['id'] - is_in_profile = ChannelProfileMembership.objects.filter( - channel_id=channel_id, - channel_profile_id__in=profile_ids, - enabled=True - ).exists() - if is_in_profile: - channels_in_profile.append(channel) - + + # Fetch all channels via ORM + all_channels = self._get_all_channels(logger) + + # Filter channels by profile (single bulk query) + profile_channel_ids = set(ChannelProfileMembership.objects.filter( + channel_profile_id__in=profile_ids, + enabled=True + ).values_list('channel_id', flat=True)) + channels_in_profile = [ch for ch in all_channels if ch['id'] in profile_channel_ids] + # Filter by groups if specified if selected_groups_str: selected_groups = [g.strip() for g in selected_groups_str.split(',') if g.strip()] valid_group_ids = [group_name_to_id[name] for name in selected_groups if name in group_name_to_id] if not valid_group_ids: return {"status": "error", "message": "None of the specified groups were found."} - + filtered_channels = [ch for ch in channels_in_profile if ch.get('channel_group_id') in valid_group_ids] channels = filtered_channels logger.info(f"[Stream-Mapparr] Filtered to {len(channels)} channels in groups: {', '.join(selected_groups)}") else: channels = channels_in_profile logger.info(f"[Stream-Mapparr] Using all channels from profile (no group filter)") - + if not channels: error_msg = "No channels found. Check profile and group filters." logger.error(f"[Stream-Mapparr] {error_msg}") return {"status": "error", "message": error_msg} - + logger.info(f"[Stream-Mapparr] Loaded {len(channels)} channels") - - # Load all streams - logger.info("[Stream-Mapparr] Loading all streams...") - all_streams = [] - page = 1 - while True: - endpoint = f"/api/channels/streams/?page={page}&page_size=100" - try: - streams_response = self._get_api_data(endpoint, token, settings, logger, limiter=limiter) - except Exception as e: - # If we get an error (e.g., 404 for non-existent page), we've reached the end - if page > 1: - logger.debug(f"[Stream-Mapparr] No more pages available (attempted page {page})") - break - else: - # If error on first page, re-raise - raise + # Load all streams via ORM + logger.info("[Stream-Mapparr] Loading all streams via ORM...") + all_streams = self._get_all_streams(logger) - # Handle both paginated and non-paginated responses - if isinstance(streams_response, dict) and 'results' in streams_response: - results = streams_response['results'] - - # Check if we got empty results - if not results: - logger.debug("[Stream-Mapparr] Reached last page of streams (empty results)") - break - - all_streams.extend(results) - logger.debug(f"[Stream-Mapparr] Fetched page {page}: {len(results)} streams (total so far: {len(all_streams)})") - - # Stop if this page had fewer results than page_size (last page) - if len(results) < 100: - logger.debug("[Stream-Mapparr] Reached last page of streams") - break - - page += 1 - elif isinstance(streams_response, list): - # Check if we got empty results - if not streams_response: - logger.debug("[Stream-Mapparr] Reached last page of streams (empty results)") - break - - # List response - could still be paginated - all_streams.extend(streams_response) - logger.debug(f"[Stream-Mapparr] Fetched page {page}: {len(streams_response)} streams (total so far: {len(all_streams)})") - - # If we got exactly 100 results, there might be more pages - if len(streams_response) == 100: - page += 1 - else: - logger.debug("[Stream-Mapparr] Reached last page of streams") - break - else: - logger.warning("[Stream-Mapparr] Unexpected streams response format") - break - if not all_streams: error_msg = "No streams found in Dispatcharr" logger.error(f"[Stream-Mapparr] {error_msg}") @@ -4460,74 +3720,37 @@ class Plugin: selected_groups_str = settings.get('selected_groups', '').strip() - # Get API token - token, error = self.get_or_refresh_api_token(settings, logger) - if error: - return {"status": "error", "message": error} - - # Fetch all channels for the profile + # Fetch all channels for the profile via ORM logger.info(f"[Stream-Mapparr] Fetching channels for profile: {profile_name}") - - # Get profile IDs - all_profiles = self._get_api_data("/api/channels/profiles/", token, settings, logger, None) + + # Get profile IDs via ORM + all_profiles = self._get_all_profiles(logger) profile_names = [p.strip() for p in profile_name.split(',')] profile_ids = [] for profile in all_profiles: if profile['name'] in profile_names: profile_ids.append(profile['id']) - + if not profile_ids: return {"status": "error", "message": f"Profile(s) not found: {profile_name}"} - - # Fetch channel groups if filtering is needed + + # Fetch channel groups via ORM if filtering is needed group_name_to_id = {} if selected_groups_str: logger.info(f"[Stream-Mapparr] Fetching channel groups for filtering...") - all_groups = [] - page = 1 - while True: - try: - api_groups = self._get_api_data(f"/api/channels/groups/?page={page}", token, settings, logger, None) - except Exception as e: - if page > 1: - logger.debug(f"[Stream-Mapparr] No more group pages available (attempted page {page})") - break - else: - raise - - if isinstance(api_groups, dict) and 'results' in api_groups: - results = api_groups['results'] - if not results: - break - all_groups.extend(results) - if not api_groups.get('next'): - break - page += 1 - elif isinstance(api_groups, list): - if not api_groups: - break - all_groups.extend(api_groups) - break - else: - break - + all_groups = self._get_all_groups(logger) group_name_to_id = {g['name']: g['id'] for g in all_groups if 'name' in g and 'id' in g} + + # Fetch ALL channels via ORM + all_channels = self._get_all_channels(logger) + logger.info(f"[Stream-Mapparr] Fetched {len(all_channels)} total channels") - # Fetch ALL channels from API - all_channels = self._get_api_data("/api/channels/channels/", token, settings, logger, None) - logger.info(f"[Stream-Mapparr] Fetched {len(all_channels)} total channels from API") - - # Filter to channels in the specified profile(s) using Django ORM - channels_in_profile = [] - for channel in all_channels: - channel_id = channel['id'] - is_in_profile = ChannelProfileMembership.objects.filter( - channel_id=channel_id, - channel_profile_id__in=profile_ids, - enabled=True - ).exists() - if is_in_profile: - channels_in_profile.append(channel) + # Filter to channels in the specified profile(s) (single bulk query) + profile_channel_ids = set(ChannelProfileMembership.objects.filter( + channel_profile_id__in=profile_ids, + enabled=True + ).values_list('channel_id', flat=True)) + channels_in_profile = [ch for ch in all_channels if ch['id'] in profile_channel_ids] logger.info(f"[Stream-Mapparr] Found {len(channels_in_profile)} channels in profile(s): {', '.join(profile_names)}") @@ -4548,14 +3771,14 @@ class Plugin: selected_m3us_str = settings.get('selected_m3us', '').strip() m3u_priority_map = {} if selected_m3us_str: - # Fetch M3U sources + # Fetch M3U sources via ORM try: - all_m3us = self._get_api_data("/api/m3u/accounts/", token, settings, logger, None) + all_m3us = self._get_all_m3u_accounts(logger) m3u_name_to_id = {m['name']: m['id'] for m in all_m3us if 'name' in m and 'id' in m} - + selected_m3us = [m.strip() for m in selected_m3us_str.split(',') if m.strip()] valid_m3u_ids = [m3u_name_to_id[name] for name in selected_m3us if name in m3u_name_to_id] - + if valid_m3u_ids: # Create M3U ID to priority mapping (0 = highest priority) m3u_priority_map = {m3u_id: idx for idx, m3u_id in enumerate(valid_m3u_ids)} @@ -4748,12 +3971,18 @@ class Plugin: channels = processed_data.get('channels', []) channels_data = self._load_channels_data(logger, settings) - # Step 1: Get stream counts + # Step 1: Get stream counts (single bulk query) self._send_progress_update("manage_channel_visibility", 'running', 20, 'Counting streams...', context) + from django.db.models import Count + channel_ids = [ch['id'] for ch in channels] + stream_counts_qs = ChannelStream.objects.filter( + channel_id__in=channel_ids + ).values('channel_id').annotate(count=Count('id')) + stream_count_map = {row['channel_id']: row['count'] for row in stream_counts_qs} channel_stream_counts = {} for channel in channels: - stream_count = ChannelStream.objects.filter(channel_id=channel['id']).count() - channel_stream_counts[channel['id']] = {'name': channel['name'], 'stream_count': stream_count} + ch_id = channel['id'] + channel_stream_counts[ch_id] = {'name': channel['name'], 'stream_count': stream_count_map.get(ch_id, 0)} # Step 2: Disable all channels using Django ORM self._send_progress_update("manage_channel_visibility", 'running', 40, f'Disabling all {len(channels)} channels...', context) @@ -4879,4 +4108,3 @@ class Plugin: except Exception as e: logger.error(f"[Stream-Mapparr] Error clearing operation lock: {e}") return {"status": "error", "message": f"Error clearing lock: {e}"} -