Update __init__.py

This commit is contained in:
Lyfesaver
2025-10-03 17:17:42 -05:00
committed by GitHub
parent 138d2ec2fe
commit 456e662578

View File

@@ -17,14 +17,13 @@ from homeassistant.util import slugify
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.SENSOR]
PLATFORMS = [Platform.SENSOR, Platform.MEDIA_PLAYER]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Dispatcharr from a config entry."""
coordinator = DispatcharrDataUpdateCoordinator(hass, entry)
# Perform initial data population. This will raise ConfigEntryNotReady on failure.
await coordinator.async_populate_channel_details()
await coordinator.async_populate_channel_map_from_xml()
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
@@ -32,6 +31,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
@@ -46,7 +46,7 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
self.config_entry = config_entry
self.websession = async_get_clientsession(hass)
self._access_token: str | None = None
self.channel_details: dict = {}
self.channel_map: dict = {}
super().__init__(
hass, _LOGGER, name=DOMAIN, update_interval=timedelta(seconds=30)
@@ -98,38 +98,96 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
except aiohttp.ClientError as err:
raise UpdateFailed(f"API request to {url} failed: {err}") from err
async def async_populate_channel_details(self):
"""Fetch all channel details to build a lookup map."""
_LOGGER.info("Populating Dispatcharr channel details")
try:
all_channels = await self._api_request("GET", f"{self.base_url}/api/channels/channels/")
if isinstance(all_channels, list):
self.channel_details = {
channel['uuid']: channel for channel in all_channels if 'uuid' in channel
}
else:
_LOGGER.warning("Expected a list of channels, but received: %s", type(all_channels))
self.channel_details = {}
_LOGGER.debug("Found %d channels", len(self.channel_details))
except Exception as e:
_LOGGER.error("Could not populate channel details: %s", e)
raise ConfigEntryNotReady(f"Could not fetch channel details: {e}") from e
async def _get_current_programs_from_xml(self, numeric_channel_ids: list[str]) -> dict:
"""Get current program for EPG IDs by parsing the raw XMLTV file."""
if not numeric_channel_ids:
return {}
now = datetime.now(timezone.utc)
async def async_populate_channel_map_from_xml(self):
"""Fetch the XML file once to build a reliable map of channels."""
_LOGGER.info("Populating Dispatcharr channel map from XML file...")
try:
xml_string = await self._api_request("GET", f"{self.base_url}/output/epg", is_json=False)
root = ET.fromstring(xml_string)
except UpdateFailed as err:
raise ConfigEntryNotReady(f"Could not fetch EPG XML file to build channel map: {err}") from err
found_programs, channels_to_find = {}, set(numeric_channel_ids)
for program in root.iterfind("programme"):
if not channels_to_find: break
channel_id_str = program.get("channel")
if channel_id_str in channels_to_find:
try:
root = ET.fromstring(xml_string)
self.channel_map = {}
for channel in root.iterfind("channel"):
display_name = channel.findtext("display-name")
channel_id = channel.get("id")
icon_tag = channel.find("icon")
icon_url = icon_tag.get("src") if icon_tag is not None else None
if display_name and channel_id:
slug_name = slugify(display_name)
self.channel_map[slug_name] = {"id": channel_id, "name": display_name, "logo_url": icon_url}
if not self.channel_map:
raise ConfigEntryNotReady("XML was fetched, but no channels could be mapped.")
_LOGGER.info("Successfully built channel map with %d entries.", len(self.channel_map))
except ET.ParseError as e:
_LOGGER.error("Failed to parse XML for channel map: %s", e)
raise ConfigEntryNotReady(f"Failed to parse XML for channel map: {e}") from e
def _get_channel_details_from_stream_name(self, stream_name: str) -> dict | None:
"""(REWRITTEN) Match a stream name to a channel in the map, preferring the longest match."""
if not stream_name:
return None
simple_stream_name = slugify(re.sub(r'^\w+:\s*|\s+HD$', '', stream_name, flags=re.IGNORECASE))
_LOGGER.debug("Attempting to match simplified stream name: '%s'", simple_stream_name)
# 1. Try for a direct, exact match first (most reliable)
if simple_stream_name in self.channel_map:
_LOGGER.debug("Found exact match for '%s'", simple_stream_name)
return self.channel_map[simple_stream_name]
# 2. If no exact match, find all possible substring matches
possible_matches = []
for slug_key, details in self.channel_map.items():
if slug_key in simple_stream_name:
possible_matches.append((slug_key, details))
# 3. If any matches were found, sort them by length and return the longest one
if possible_matches:
_LOGGER.debug("Found possible matches: %s", [m[0] for m in possible_matches])
# Sort by the length of the key (item[0]), descending, and return the details of the best match
best_match = sorted(possible_matches, key=lambda item: len(item[0]), reverse=True)[0]
_LOGGER.debug("Selected best match: '%s'", best_match[0])
return best_match[1]
_LOGGER.debug("Could not find any match for stream name: '%s'", stream_name)
return None
async def _async_update_data(self):
"""Update data by fetching from authenticated endpoints."""
status_data = await self._api_request("GET", f"{self.base_url}/proxy/ts/status")
active_streams = status_data.get("channels", [])
if not active_streams: return {}
xml_string = await self._api_request("GET", f"{self.base_url}/output/epg", is_json=False)
try:
root = ET.fromstring(xml_string)
except ET.ParseError as e:
_LOGGER.error("Could not parse EPG XML on update: %s", e)
return self.data
enriched_streams = {}
now = datetime.now(timezone.utc)
for stream in active_streams:
stream_uuid = stream.get("channel_id")
stream_name = stream.get("stream_name")
if not stream_uuid or not stream_name: continue
details = self._get_channel_details_from_stream_name(stream_name)
enriched_stream = stream.copy()
if details:
xmltv_id = details["id"]
enriched_stream["xmltv_id"] = xmltv_id
enriched_stream["channel_name"] = details["name"]
enriched_stream["logo_url"] = details.get("logo_url")
for program in root.iterfind(f".//programme[@channel='{xmltv_id}']"):
start_str, stop_str = program.get("start"), program.get("stop")
if start_str and stop_str:
try:
@@ -137,47 +195,16 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
stop_time = datetime.strptime(stop_str, "%Y%m%d%H%M%S %z")
if start_time <= now < stop_time:
episode_num_tag = program.find("episode-num[@system='onscreen']")
found_programs[channel_id_str] = {
"title": program.findtext("title"),
"description": program.findtext("desc"),
"start_time": start_time.isoformat(),
"end_time": stop_time.isoformat(),
enriched_stream["program"] = {
"title": program.findtext("title"), "description": program.findtext("desc"),
"start_time": start_time.isoformat(), "end_time": stop_time.isoformat(),
"subtitle": program.findtext("sub-title"),
"episode_num": episode_num_tag.text if episode_num_tag is not None else None,
}
channels_to_find.remove(channel_id_str)
except (ValueError, TypeError): continue
return found_programs
except (UpdateFailed, ET.ParseError) as e:
_LOGGER.warning("Could not get or parse EPG XML file, program info will be unavailable: %s", e)
return {}
break
except (ValueError, TypeError):
continue
async def _async_update_data(self):
"""Update data via authenticated API calls."""
status_data = await self._api_request("GET", f"{self.base_url}/proxy/ts/status")
active_streams = status_data.get("channels", [])
if not active_streams: return {}
current_programs_map = {}
if self.config_entry.options.get("enable_epg", True):
active_numeric_ids = list(set([
str(int(details['channel_number']))
for stream in active_streams
if (details := self.channel_details.get(stream['channel_id'])) and details.get('channel_number') is not None
]))
if active_numeric_ids:
current_programs_map = await self._get_current_programs_from_xml(active_numeric_ids)
enriched_streams = {}
for stream in active_streams:
stream_uuid = stream['channel_id']
enriched_stream = stream.copy()
details = self.channel_details.get(stream_uuid)
if details:
if logo_id := details.get("logo_id"):
enriched_stream["logo_url"] = f"{self.base_url}/api/channels/logos/{logo_id}/cache/"
if numeric_id_float := details.get("channel_number"):
numeric_id_str = str(int(numeric_id_float))
enriched_stream["program"] = current_programs_map.get(numeric_id_str)
enriched_streams[stream_uuid] = enriched_stream
return enriched_streams