Add stream catalog and active session entities

This commit is contained in:
2026-04-10 19:20:13 +02:00
parent 8cf2447667
commit 3ac86ddce4
5 changed files with 337 additions and 67 deletions

View File

@@ -1,9 +1,9 @@
"""The Dispatcharr integration."""
import logging
from datetime import timedelta, datetime, timezone
import xml.etree.ElementTree as ET
import re
import json
import aiohttp
from homeassistant.core import HomeAssistant
@@ -17,7 +17,8 @@ from homeassistant.util import slugify
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.SENSOR, Platform.MEDIA_PLAYER]
PLATFORMS = [Platform.SENSOR, Platform.MEDIA_PLAYER, Platform.BUTTON]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Dispatcharr from a config entry."""
@@ -38,6 +39,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
"""Manages fetching and coordinating Dispatcharr data."""
@@ -47,6 +49,10 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
self.websession = async_get_clientsession(hass)
self._access_token: str | None = None
self.channel_map: dict = {}
self.stream_catalog: dict[str, dict] = {}
self.stream_catalog_count: int = 0
self.stream_catalog_active_count: int = 0
self._last_catalog_refresh: datetime | None = None
super().__init__(
hass, _LOGGER, name=DOMAIN, update_interval=timedelta(seconds=30)
@@ -72,7 +78,9 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
tokens = await response.json()
self._access_token = tokens.get("access")
if not self._access_token:
raise ConfigEntryNotReady("Authentication successful, but no access token received.")
raise ConfigEntryNotReady(
"Authentication successful, but no access token received."
)
_LOGGER.info("Successfully authenticated with Dispatcharr")
except aiohttp.ClientError as err:
_LOGGER.error("Authentication failed: %s", err)
@@ -86,25 +94,115 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
headers = {"Authorization": f"Bearer {self._access_token}"}
try:
response = await self.websession.request(method, url, headers=headers, **kwargs)
if response.status == 401:
_LOGGER.info("Access token expired or invalid, requesting a new one.")
await self._get_new_token()
headers["Authorization"] = f"Bearer {self._access_token}"
response = await self.websession.request(method, url, headers=headers, **kwargs)
async with self.websession.request(
method, url, headers=headers, **kwargs
) as response:
if response.status == 401:
_LOGGER.info(
"Access token expired or invalid, requesting a new one."
)
await self._get_new_token()
headers["Authorization"] = f"Bearer {self._access_token}"
async with self.websession.request(
method, url, headers=headers, **kwargs
) as retry_response:
retry_response.raise_for_status()
return (
await retry_response.json()
if is_json
else await retry_response.text()
)
response.raise_for_status()
return await response.json() if is_json else await response.text()
response.raise_for_status()
return await response.json() if is_json else await response.text()
except aiohttp.ClientError as err:
raise UpdateFailed(f"API request to {url} failed: {err}") from err
async def api_request(
self, method: str, endpoint: str, is_json: bool = True, **kwargs
):
"""Public API helper supporting relative endpoints."""
url = endpoint if endpoint.startswith("http") else f"{self.base_url}{endpoint}"
return await self._api_request(method, url, is_json=is_json, **kwargs)
def _normalize_stream_name(self, name: str) -> str:
"""Normalize stream name for fuzzy cross-endpoint matching."""
return slugify(re.sub(r"^\w+:\s*|\s+HD$", "", name or "", flags=re.IGNORECASE))
async def _async_fetch_stream_catalog(self) -> None:
"""Fetch stream list from Dispatcharr streams endpoint."""
all_streams: list[dict] = []
page = 1
page_size = 500
max_pages = 40
while page <= max_pages:
payload = await self.api_request(
"GET", f"/api/channels/streams/?page={page}&page_size={page_size}"
)
if not isinstance(payload, dict):
break
results = payload.get("results", [])
if not isinstance(results, list):
break
all_streams.extend(results)
if not payload.get("next"):
break
page += 1
catalog: dict[str, dict] = {}
active_count = 0
for stream in all_streams:
if not isinstance(stream, dict):
continue
name = stream.get("name")
if not name:
continue
normalized = self._normalize_stream_name(name)
if not normalized:
continue
current_viewers = stream.get("current_viewers") or 0
if isinstance(current_viewers, int) and current_viewers > 0:
active_count += 1
existing = catalog.get(normalized)
if existing is None:
catalog[normalized] = stream
else:
existing_viewers = existing.get("current_viewers") or 0
if (
isinstance(current_viewers, int)
and current_viewers > existing_viewers
):
catalog[normalized] = stream
self.stream_catalog = catalog
self.stream_catalog_count = len(all_streams)
self.stream_catalog_active_count = active_count
self._last_catalog_refresh = datetime.now(timezone.utc)
def _get_catalog_stream(self, stream_name: str) -> dict | None:
"""Best-effort stream lookup from /api/channels/streams."""
if not stream_name:
return None
return self.stream_catalog.get(self._normalize_stream_name(stream_name))
async def async_populate_channel_map_from_xml(self):
"""Fetch the XML file once to build a reliable map of channels."""
_LOGGER.info("Populating Dispatcharr channel map from XML file...")
try:
xml_string = await self._api_request("GET", f"{self.base_url}/output/epg", is_json=False)
xml_string = await self._api_request(
"GET", f"{self.base_url}/output/epg", is_json=False
)
except UpdateFailed as err:
raise ConfigEntryNotReady(f"Could not fetch EPG XML file to build channel map: {err}") from err
raise ConfigEntryNotReady(
f"Could not fetch EPG XML file to build channel map: {err}"
) from err
try:
root = ET.fromstring(xml_string)
@@ -117,23 +215,35 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
if display_name and channel_id:
slug_name = slugify(display_name)
self.channel_map[slug_name] = {"id": channel_id, "name": display_name, "logo_url": icon_url}
self.channel_map[slug_name] = {
"id": channel_id,
"name": display_name,
"logo_url": icon_url,
}
if not self.channel_map:
raise ConfigEntryNotReady("XML was fetched, but no channels could be mapped.")
raise ConfigEntryNotReady(
"XML was fetched, but no channels could be mapped."
)
_LOGGER.info("Successfully built channel map with %d entries.", len(self.channel_map))
_LOGGER.info(
"Successfully built channel map with %d entries.", len(self.channel_map)
)
except ET.ParseError as e:
_LOGGER.error("Failed to parse XML for channel map: %s", e)
raise ConfigEntryNotReady(f"Failed to parse XML for channel map: {e}") from e
raise ConfigEntryNotReady(
f"Failed to parse XML for channel map: {e}"
) from e
def _get_channel_details_from_stream_name(self, stream_name: str) -> dict | None:
"""(REWRITTEN) Match a stream name to a channel in the map, preferring the longest match."""
if not stream_name:
return None
simple_stream_name = slugify(re.sub(r'^\w+:\s*|\s+HD$', '', stream_name, flags=re.IGNORECASE))
_LOGGER.debug("Attempting to match simplified stream name: '%s'", simple_stream_name)
simple_stream_name = self._normalize_stream_name(stream_name)
_LOGGER.debug(
"Attempting to match simplified stream name: '%s'", simple_stream_name
)
# 1. Try for a direct, exact match first (most reliable)
if simple_stream_name in self.channel_map:
@@ -148,9 +258,13 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
# 3. If any matches were found, sort them by length and return the longest one
if possible_matches:
_LOGGER.debug("Found possible matches: %s", [m[0] for m in possible_matches])
_LOGGER.debug(
"Found possible matches: %s", [m[0] for m in possible_matches]
)
# Sort by the length of the key (item[0]), descending, and return the details of the best match
best_match = sorted(possible_matches, key=lambda item: len(item[0]), reverse=True)[0]
best_match = sorted(
possible_matches, key=lambda item: len(item[0]), reverse=True
)[0]
_LOGGER.debug("Selected best match: '%s'", best_match[0])
return best_match[1]
@@ -159,11 +273,21 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
async def _async_update_data(self):
"""Update data by fetching from authenticated endpoints."""
now = datetime.now(timezone.utc)
if (
self._last_catalog_refresh is None
or now - self._last_catalog_refresh > timedelta(minutes=10)
):
await self._async_fetch_stream_catalog()
status_data = await self._api_request("GET", f"{self.base_url}/proxy/ts/status")
active_streams = status_data.get("channels", [])
if not active_streams: return {}
if not active_streams:
return {}
xml_string = await self._api_request("GET", f"{self.base_url}/output/epg", is_json=False)
xml_string = await self._api_request(
"GET", f"{self.base_url}/output/epg", is_json=False
)
try:
root = ET.fromstring(xml_string)
except ET.ParseError as e:
@@ -171,16 +295,31 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
return self.data
enriched_streams = {}
now = datetime.now(timezone.utc)
for stream in active_streams:
stream_uuid = stream.get("channel_id")
stream_name = stream.get("stream_name")
if not stream_uuid or not stream_name: continue
if not stream_uuid or not stream_name:
continue
details = self._get_channel_details_from_stream_name(stream_name)
enriched_stream = stream.copy()
catalog_stream = self._get_catalog_stream(stream_name)
if catalog_stream:
enriched_stream["catalog_stream_id"] = catalog_stream.get("id")
enriched_stream["catalog_tvg_id"] = catalog_stream.get("tvg_id")
enriched_stream["catalog_current_viewers"] = catalog_stream.get(
"current_viewers"
)
enriched_stream["stream_stats"] = catalog_stream.get("stream_stats")
enriched_stream["stream_profile_id"] = catalog_stream.get(
"stream_profile_id"
)
enriched_stream["stream_chno"] = catalog_stream.get("stream_chno")
if not enriched_stream.get("logo_url"):
enriched_stream["logo_url"] = catalog_stream.get("logo_url")
if details:
xmltv_id = details["id"]
enriched_stream["xmltv_id"] = xmltv_id
@@ -194,12 +333,18 @@ class DispatcharrDataUpdateCoordinator(DataUpdateCoordinator):
start_time = datetime.strptime(start_str, "%Y%m%d%H%M%S %z")
stop_time = datetime.strptime(stop_str, "%Y%m%d%H%M%S %z")
if start_time <= now < stop_time:
episode_num_tag = program.find("episode-num[@system='onscreen']")
episode_num_tag = program.find(
"episode-num[@system='onscreen']"
)
enriched_stream["program"] = {
"title": program.findtext("title"), "description": program.findtext("desc"),
"start_time": start_time.isoformat(), "end_time": stop_time.isoformat(),
"title": program.findtext("title"),
"description": program.findtext("desc"),
"start_time": start_time.isoformat(),
"end_time": stop_time.isoformat(),
"subtitle": program.findtext("sub-title"),
"episode_num": episode_num_tag.text if episode_num_tag is not None else None,
"episode_num": episode_num_tag.text
if episode_num_tag is not None
else None,
}
break
except (ValueError, TypeError):

View File

@@ -0,0 +1,40 @@
"""Button platform for Dispatcharr."""
import logging
from homeassistant.components.button import ButtonEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.entity import DeviceInfo
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
"""Set up the button platform."""
coordinator = hass.data[DOMAIN][entry.entry_id]
entities = [
DispatcharrButton(coordinator, "Sync Playlists (M3U)", "mdi:playlist-refresh", "/api/m3u/refresh/"),
DispatcharrButton(coordinator, "Sync EPG (Guide)", "mdi:television-guide", "/api/epg/import/"),
DispatcharrButton(coordinator, "Rehash Streams", "mdi:refresh-circle", "/api/core/rehash-streams/"),
]
async_add_entities(entities)
class DispatcharrButton(ButtonEntity):
"""Representation of a Dispatcharr maintenance button."""
def __init__(self, coordinator, name, icon, api_endpoint):
self.coordinator = coordinator
self._attr_name = name
self._attr_icon = icon
self._api_endpoint = api_endpoint
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{api_endpoint}"
self._attr_device_info = DeviceInfo(identifiers={(DOMAIN, coordinator.config_entry.entry_id)}, name="Dispatcharr")
async def async_press(self) -> None:
"""Handle the button press."""
try:
await self.coordinator.api_request("POST", self._api_endpoint)
_LOGGER.info("Triggered %s successfully", self._attr_name)
except Exception as e:
_LOGGER.error("Failed to trigger %s: %s", self._attr_name, e)

View File

@@ -1,5 +1,3 @@
"""Constants for the Dispatcharr Sensor integration."""
DOMAIN = "dispatcharr_sensor"
# Add BUTTON and MEDIA_PLAYER explicitly
PLATFORMS = ["sensor", "media_player", "button"]

View File

@@ -1,6 +1,6 @@
"""Media Player platform for Dispatcharr."""
import logging
import re
from homeassistant.core import HomeAssistant, callback
from homeassistant.config_entries import ConfigEntry
@@ -19,6 +19,7 @@ from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
@@ -28,12 +29,15 @@ async def async_setup_entry(
coordinator = hass.data[DOMAIN][config_entry.entry_id]
DispatcharrStreamManager(coordinator, async_add_entities)
class DispatcharrStreamManager:
"""Manages the creation and removal of media_player entities."""
def __init__(self, coordinator, async_add_entities):
self._coordinator = coordinator
self._async_add_entities = async_add_entities
self._known_stream_ids = set()
self._entities = {}
self._coordinator.async_add_listener(self._update_entities)
@callback
@@ -45,12 +49,26 @@ class DispatcharrStreamManager:
new_stream_ids = current_stream_ids - self._known_stream_ids
if new_stream_ids:
new_entities = [DispatcharrStreamMediaPlayer(self._coordinator, stream_id) for stream_id in new_stream_ids]
new_entities = [
DispatcharrStreamMediaPlayer(self._coordinator, stream_id)
for stream_id in new_stream_ids
]
self._async_add_entities(new_entities)
for entity in new_entities:
self._entities[entity._stream_id] = entity
self._known_stream_ids.update(new_stream_ids)
stale_stream_ids = self._known_stream_ids - current_stream_ids
for stream_id in stale_stream_ids:
entity = self._entities.pop(stream_id, None)
if entity is not None:
entity.async_remove(force_remove=True)
self._known_stream_ids -= stale_stream_ids
class DispatcharrStreamMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
"""Dispatcharr stream player."""
_attr_should_poll = False
_attr_has_entity_name = True
_attr_device_class = MediaPlayerDeviceClass.TV
@@ -62,22 +80,34 @@ class DispatcharrStreamMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
self._stream_id = stream_id
stream_data = self.coordinator.data.get(self._stream_id) or {}
name = stream_data.get("channel_name", stream_data.get("stream_name", f"Stream {self._stream_id[-6:]}"))
name = stream_data.get(
"channel_name",
stream_data.get("stream_name", f"Stream {self._stream_id[-6:]}"),
)
self._attr_name = name
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{self._stream_id}"
self._attr_device_info = DeviceInfo(identifiers={(DOMAIN, coordinator.config_entry.entry_id)}, name="Dispatcharr")
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, coordinator.config_entry.entry_id)},
name="Dispatcharr",
)
@property
def available(self) -> bool:
return super().available and self.coordinator.data is not None and self._stream_id in self.coordinator.data
return (
super().available
and self.coordinator.data is not None
and self._stream_id in self.coordinator.data
)
async def async_media_stop(self) -> None:
"""Send a command to Dispatcharr to kill this stream."""
_LOGGER.info("Stopping stream %s via API", self._stream_id)
try:
# Uses the DELETE endpoint defined in Swagger
await self.coordinator.api_request("DELETE", f"/proxy/ts/stop/{self._stream_id}")
await self.coordinator.api_request(
"DELETE", f"/proxy/ts/stop/{self._stream_id}"
)
# Optimistic state update
self._attr_state = STATE_IDLE
self.async_write_ha_state()
@@ -97,13 +127,16 @@ class DispatcharrStreamMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
stream_data = self.coordinator.data[self._stream_id]
program_data = stream_data.get("program") or {}
stream_stats = stream_data.get("stream_stats") or {}
self._attr_state = STATE_PLAYING
self._attr_app_name = "Dispatcharr"
self._attr_entity_picture = stream_data.get("logo_url")
self._attr_media_content_type = MediaType.TVSHOW
self._attr_media_series_title = program_data.get("title")
self._attr_media_title = program_data.get("subtitle") or program_data.get("title")
self._attr_media_title = program_data.get("subtitle") or program_data.get(
"title"
)
# Extra attributes
self._attr_extra_state_attributes = {
@@ -111,8 +144,14 @@ class DispatcharrStreamMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
"channel_name": stream_data.get("channel_name"),
"program_description": program_data.get("description"),
"clients": stream_data.get("client_count"),
"resolution": stream_data.get("resolution"),
"video_codec": stream_data.get("video_codec"),
"audio_codec": stream_data.get("audio_codec"),
"catalog_current_viewers": stream_data.get("catalog_current_viewers"),
"catalog_stream_id": stream_data.get("catalog_stream_id"),
"catalog_tvg_id": stream_data.get("catalog_tvg_id"),
"resolution": stream_data.get("resolution")
or stream_stats.get("resolution"),
"video_codec": stream_data.get("video_codec")
or stream_stats.get("video_codec"),
"audio_codec": stream_data.get("audio_codec")
or stream_stats.get("audio_codec"),
}
self.async_write_ha_state()

View File

@@ -1,4 +1,5 @@
"""Sensor platform for Dispatcharr."""
import logging
from homeassistant.core import HomeAssistant, callback
@@ -14,6 +15,7 @@ from . import DispatcharrDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
@@ -23,14 +25,21 @@ async def async_setup_entry(
try:
coordinator = hass.data[DOMAIN][config_entry.entry_id]
except KeyError:
raise PlatformNotReady(f"Coordinator not found for entry {config_entry.entry_id}")
raise PlatformNotReady(
f"Coordinator not found for entry {config_entry.entry_id}"
)
# This platform now ONLY creates the total streams sensor.
async_add_entities([DispatcharrTotalStreamSensor(coordinator)])
async_add_entities(
[
DispatcharrTotalStreamSensor(coordinator),
DispatcharrCatalogStreamsSensor(coordinator),
]
)
class DispatcharrTotalStreamSensor(CoordinatorEntity, SensorEntity):
"""A sensor to show the total number of active Dispatcharr streams."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_has_entity_name = True
@@ -39,9 +48,48 @@ class DispatcharrTotalStreamSensor(CoordinatorEntity, SensorEntity):
self._attr_name = "Total Active Streams"
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_total_streams"
self._attr_icon = "mdi:play-network"
self._attr_device_info = DeviceInfo(identifiers={(DOMAIN, coordinator.config_entry.entry_id)}, name="Dispatcharr")
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, coordinator.config_entry.entry_id)},
name="Dispatcharr",
)
@callback
def _handle_coordinator_update(self) -> None:
self._attr_native_value = len(self.coordinator.data or {})
active = list((self.coordinator.data or {}).values())
self._attr_extra_state_attributes = {
"active_stream_count": len(active),
"active_stream_names": [
item.get("channel_name") or item.get("stream_name")
for item in active[:50]
if item.get("channel_name") or item.get("stream_name")
],
"catalog_stream_count": self.coordinator.stream_catalog_count,
"catalog_active_stream_count": self.coordinator.stream_catalog_active_count,
}
self.async_write_ha_state()
class DispatcharrCatalogStreamsSensor(CoordinatorEntity, SensorEntity):
"""A sensor showing total known streams from /api/channels/streams."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_has_entity_name = True
def __init__(self, coordinator: DispatcharrDataUpdateCoordinator):
super().__init__(coordinator)
self._attr_name = "Known Streams"
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_known_streams"
self._attr_icon = "mdi:playlist-play"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, coordinator.config_entry.entry_id)},
name="Dispatcharr",
)
@callback
def _handle_coordinator_update(self) -> None:
self._attr_native_value = self.coordinator.stream_catalog_count
self._attr_extra_state_attributes = {
"catalog_active_stream_count": self.coordinator.stream_catalog_active_count,
}
self.async_write_ha_state()