#include "graph_engine.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // Default port type strings (must match qpwgraph) #define DEFAULT_AUDIO_TYPE "32 bit float mono audio" #define DEFAULT_MIDI_TYPE "8 bit raw midi" #define DEFAULT_MIDI2_TYPE "32 bit raw UMP" #define DEFAULT_VIDEO_TYPE "32 bit float RGBA video" #define DEFAULT_OTHER_TYPE "PIPEWIRE_PORT_TYPE" using namespace pwgraph; // ============================================================================ // Pending/sync helpers (ported from qpwgraph_pipewire.cpp) // ============================================================================ static void add_pending(GraphEngine::Object *obj, GraphEngine::PwData &pw) { if (obj->pending_seq == 0) spa_list_append(&pw.pending, &obj->pending_link); obj->pending_seq = pw_core_sync(pw.core, 0, obj->pending_seq); pw.pending_seq = obj->pending_seq; } static void remove_pending(GraphEngine::Object *obj) { if (obj->pending_seq != 0) { spa_list_remove(&obj->pending_link); obj->pending_seq = 0; } } // ============================================================================ // Proxy helpers // ============================================================================ static void destroy_proxy(GraphEngine::Object *obj) { pw_proxy *proxy = (pw_proxy*)obj->proxy; if (proxy) { obj->proxy = nullptr; pw_proxy_destroy(proxy); } if (obj->object_listener.link.next) { spa_hook_remove(&obj->object_listener); spa_zero(obj->object_listener); } if (obj->proxy_listener.link.next) { spa_hook_remove(&obj->proxy_listener); spa_zero(obj->proxy_listener); } remove_pending(obj); if (obj->info && obj->destroy_info) { obj->destroy_info(obj->info); obj->info = nullptr; } } // ============================================================================ // Node events (ported from qpwgraph_pipewire.cpp lines 224-278) // ============================================================================ static void on_node_info(void *data, const struct pw_node_info *info) { auto *obj = static_cast(data); if (!obj) return; info = pw_node_info_update((struct pw_node_info*)obj->info, info); obj->info = (void*)info; auto *nobj = static_cast(obj); if (info && (info->change_mask & PW_NODE_CHANGE_MASK_PROPS)) { if (info->props) { const char *media_name = spa_dict_lookup(info->props, PW_KEY_MEDIA_NAME); if (media_name && strlen(media_name) > 0) nobj->node.media_name = media_name; // NOTE: volume/mute are intentionally NOT read from info->props here. // info->props contains static initial values and is NOT updated when // volume/mute change at runtime. Live state comes from SPA_PARAM_Props // via on_node_param (subscribed below in create_proxy_for_object). // Reading stale props here caused on_node_info to overwrite correct // runtime state back to the initial value (the unmute race condition). // Read additional properties const char *str; str = spa_dict_lookup(info->props, "default.clock.rate"); if (str && strlen(str) > 0) nobj->node.sample_rate = (uint32_t)atoi(str); str = spa_dict_lookup(info->props, "default.clock.quantum"); if (str && strlen(str) > 0) nobj->node.quantum = (uint32_t)atoi(str); // Also try clock.rate (set on some devices) if (nobj->node.sample_rate == 0) { str = spa_dict_lookup(info->props, "clock.rate"); if (str && strlen(str) > 0) nobj->node.sample_rate = (uint32_t)atoi(str); } // Also try api.alsa.rate if (nobj->node.sample_rate == 0) { str = spa_dict_lookup(info->props, "api.alsa.rate"); if (str && strlen(str) > 0) nobj->node.sample_rate = (uint32_t)atoi(str); } str = spa_dict_lookup(info->props, "audio.channels"); if (str) nobj->node.channels = (uint32_t)atoi(str); str = spa_dict_lookup(info->props, "audio.format"); if (str) nobj->node.format = str; str = spa_dict_lookup(info->props, "device.name"); if (str) nobj->node.device_name = str; str = spa_dict_lookup(info->props, "device.bus"); if (str) nobj->node.device_bus = str; str = spa_dict_lookup(info->props, "api.alsa.path"); if (str) nobj->node.api = str; str = spa_dict_lookup(info->props, "priority.driver"); if (str) nobj->node.priority = atoi(str); // Latency: "256/48000" format str = spa_dict_lookup(info->props, "node.latency"); if (str) { uint32_t q = 0, r = 0; if (sscanf(str, "%u/%u", &q, &r) == 2) { if (nobj->node.quantum == 0) nobj->node.quantum = q; if (nobj->node.sample_rate == 0 && r > 0) nobj->node.sample_rate = r; } } // ALSA period size str = spa_dict_lookup(info->props, "api.alsa.period-size"); if (str) nobj->node.rate = (uint32_t)atoi(str); // Fallback: clock.rate if (nobj->node.sample_rate == 0) { str = spa_dict_lookup(info->props, "clock.rate"); if (str) nobj->node.sample_rate = (uint32_t)atoi(str); } // Fallback: api.alsa.rate if (nobj->node.sample_rate == 0) { str = spa_dict_lookup(info->props, "api.alsa.rate"); if (str) nobj->node.sample_rate = (uint32_t)atoi(str); } } } nobj->node.changed = true; nobj->node.ready = true; // Notify so frontend reflects property changes (sample rate, media name, etc.) if (obj->engine_ref) obj->engine_ref->notifyChanged(); } // Parse audio format and Props params static void on_node_param(void *data, int seq, uint32_t id, uint32_t index, uint32_t next, const struct spa_pod *param) { auto *obj = static_cast(data); if (!obj) return; auto *nobj = static_cast(obj); if (param == NULL) return; if (id == SPA_PARAM_Format) { uint32_t media_type, media_subtype; if (spa_format_parse(param, &media_type, &media_subtype) < 0) return; if (media_type == SPA_MEDIA_TYPE_audio && media_subtype == SPA_MEDIA_SUBTYPE_raw) { struct spa_audio_info_raw info; spa_zero(info); if (spa_format_audio_raw_parse(param, &info) >= 0) { if (info.rate > 0) nobj->node.sample_rate = info.rate; if (info.channels > 0) nobj->node.channels = info.channels; nobj->node.format = spa_type_audio_format_to_short_name((uint32_t)info.format); nobj->node.changed = true; } } } else if (id == SPA_PARAM_Props) { // Parse live volume/mute state from Props params. // This is the authoritative source — info->props only has initial/static values. const struct spa_pod_object *pobj = (const struct spa_pod_object *)param; struct spa_pod_prop *prop; SPA_POD_OBJECT_FOREACH(pobj, prop) { switch (prop->key) { case SPA_PROP_volume: { float vol; if (spa_pod_get_float(&prop->value, &vol) == 0) nobj->node.volume = vol; break; } case SPA_PROP_channelVolumes: { // Average channel volumes for display float vols[32]; uint32_t n = spa_pod_copy_array(&prop->value, SPA_TYPE_Float, vols, 32); if (n > 0) { float avg = 0; for (uint32_t i = 0; i < n; i++) avg += vols[i]; nobj->node.volume = avg / n; } break; } case SPA_PROP_mute: { bool m; if (spa_pod_get_bool(&prop->value, &m) == 0) nobj->node.mute = m; break; } default: break; } } nobj->node.changed = true; // Broadcast live volume/mute changes from any source (browser, pulsemixer, etc.) if (obj->engine_ref) obj->engine_ref->notifyChanged(); } } static const struct pw_node_events node_events = { .version = PW_VERSION_NODE_EVENTS, .info = on_node_info, .param = on_node_param, }; // ============================================================================ // Port events // ============================================================================ static void on_port_info(void *data, const struct pw_port_info *info) { auto *obj = static_cast(data); if (!obj) return; info = pw_port_info_update((struct pw_port_info*)obj->info, info); obj->info = (void*)info; } static const struct pw_port_events port_events = { .version = PW_VERSION_PORT_EVENTS, .info = on_port_info, }; // ============================================================================ // Link events // ============================================================================ static void on_link_info(void *data, const struct pw_link_info *info) { auto *obj = static_cast(data); if (!obj) return; info = pw_link_info_update((struct pw_link_info*)obj->info, info); obj->info = (void*)info; } static const struct pw_link_events link_events = { .version = PW_VERSION_LINK_EVENTS, .info = on_link_info, }; // ============================================================================ // Proxy events // ============================================================================ static void on_proxy_removed(void *data) { auto *obj = static_cast(data); if (obj && obj->proxy) { pw_proxy *proxy = (pw_proxy*)obj->proxy; obj->proxy = nullptr; pw_proxy_destroy(proxy); } } static void on_proxy_destroy(void *data) { auto *obj = static_cast(data); if (obj) destroy_proxy(obj); } static const struct pw_proxy_events proxy_events = { .version = PW_VERSION_PROXY_EVENTS, .destroy = on_proxy_destroy, .removed = on_proxy_removed, }; // ============================================================================ // Object proxy creation (adapted from qpwgraph_pipewire::Object::create_proxy) // ============================================================================ static void create_proxy_for_object(GraphEngine::Object *obj, GraphEngine *engine) { if (obj->proxy) return; const char *proxy_type = nullptr; uint32_t version = 0; void (*destroy_info)(void*) = nullptr; const void *events = nullptr; switch (obj->type) { case GraphEngine::Object::ObjNode: proxy_type = PW_TYPE_INTERFACE_Node; version = PW_VERSION_NODE; destroy_info = (void(*)(void*))pw_node_info_free; events = &node_events; break; case GraphEngine::Object::ObjPort: proxy_type = PW_TYPE_INTERFACE_Port; version = PW_VERSION_PORT; destroy_info = (void(*)(void*))pw_port_info_free; events = &port_events; break; case GraphEngine::Object::ObjLink: proxy_type = PW_TYPE_INTERFACE_Link; version = PW_VERSION_LINK; destroy_info = (void(*)(void*))pw_link_info_free; events = &link_events; break; } auto &pw = engine->pwData(); pw_proxy *proxy = (pw_proxy*)pw_registry_bind( pw.registry, obj->id, proxy_type, version, 0); if (proxy) { obj->proxy = proxy; obj->destroy_info = destroy_info; obj->pending_seq = 0; obj->engine_ref = engine; pw_proxy_add_object_listener(proxy, &obj->object_listener, events, obj); pw_proxy_add_listener(proxy, &obj->proxy_listener, &proxy_events, obj); // Subscribe to Format + Props params for nodes if (obj->type == GraphEngine::Object::ObjNode) { uint32_t ids[2] = { SPA_PARAM_Format, SPA_PARAM_Props }; pw_node_subscribe_params((pw_node*)proxy, ids, 2); } } } // ============================================================================ // Core events (sync/error handling) // ============================================================================ static void on_core_done(void *data, uint32_t id, int seq) { auto *engine = static_cast(data); auto &pw = engine->pwData(); if (id == PW_ID_CORE) { pw.last_seq = seq; if (pw.pending_seq == seq) pw_thread_loop_signal(pw.loop, false); } } static void on_core_error(void *data, uint32_t id, int seq, int res, const char *message) { auto *engine = static_cast(data); auto &pw = engine->pwData(); if (id == PW_ID_CORE) { pw.last_res = res; if (res == -EPIPE) pw.error = true; } pw_thread_loop_signal(pw.loop, false); } static const struct pw_core_events core_events = { .version = PW_VERSION_CORE_EVENTS, .info = nullptr, .done = on_core_done, .error = on_core_error, }; // ============================================================================ // Link proxy sync (for connect/disconnect operations) // ============================================================================ static int link_proxy_sync(GraphEngine *engine) { auto &pw = engine->pwData(); if (pw_thread_loop_in_thread(pw.loop)) return 0; pw.pending_seq = pw_proxy_sync((pw_proxy*)pw.core, pw.pending_seq); while (true) { pw_thread_loop_wait(pw.loop); if (pw.error) return pw.last_res; if (pw.pending_seq == pw.last_seq) break; } return 0; } static void on_link_proxy_error(void *data, int seq, int res, const char *message) { int *link_res = (int*)data; *link_res = res; } static const struct pw_proxy_events link_proxy_events = { .version = PW_VERSION_PROXY_EVENTS, .error = on_link_proxy_error, }; // ============================================================================ // Registry events (the main entry point — ported from qpwgraph lines 425-568) // ============================================================================ static void on_registry_global(void *data, uint32_t id, uint32_t permissions, const char *type, uint32_t version, const struct spa_dict *props) { if (!props) return; auto *engine = static_cast(data); if (strcmp(type, PW_TYPE_INTERFACE_Node) == 0) { // Parse node properties (ported from qpwgraph lines 444-489) const char *str = spa_dict_lookup(props, PW_KEY_NODE_DESCRIPTION); const char *nick = spa_dict_lookup(props, PW_KEY_NODE_NICK); if (!str || strlen(str) < 1) str = nick; if (!str || strlen(str) < 1) str = nick = spa_dict_lookup(props, PW_KEY_NODE_NAME); if (!str || strlen(str) < 1) str = "node"; std::string node_name; const char *app = spa_dict_lookup(props, PW_KEY_APP_NAME); if (app && strlen(app) > 0 && strcmp(app, str) != 0) { node_name += app; node_name += '/'; } node_name += str; std::string node_nick = (nick ? nick : str); PortMode node_mode = PortMode::None; uint8_t node_types = 0; str = spa_dict_lookup(props, PW_KEY_MEDIA_CLASS); if (str) { if (strstr(str, "Source") || strstr(str, "Output")) node_mode = PortMode::Output; else if (strstr(str, "Sink") || strstr(str, "Input")) node_mode = PortMode::Input; if (strstr(str, "Audio")) node_types |= (uint8_t)NodeType::Audio; if (strstr(str, "Video")) node_types |= (uint8_t)NodeType::Video; if (strstr(str, "Midi")) node_types |= (uint8_t)NodeType::Midi; } if (node_mode == PortMode::None) { str = spa_dict_lookup(props, PW_KEY_MEDIA_CATEGORY); if (str && strstr(str, "Duplex")) node_mode = PortMode::Duplex; } // Create node object auto *nobj = new GraphEngine::NodeObj(id); nobj->node.id = id; nobj->node.name = node_name; nobj->node.nick = node_nick; nobj->node.mode = node_mode; nobj->node.node_type = NodeType(node_types); nobj->node.mode2 = node_mode; nobj->node.ready = false; nobj->node.changed = false; engine->addObject(id, nobj); create_proxy_for_object(nobj, engine); engine->notifyChanged(); } else if (strcmp(type, PW_TYPE_INTERFACE_Port) == 0) { // Parse port properties (ported from qpwgraph lines 492-535) const char *str = spa_dict_lookup(props, PW_KEY_NODE_ID); const uint32_t node_id = (str ? (uint32_t)atoi(str) : 0); std::string port_name; str = spa_dict_lookup(props, PW_KEY_PORT_ALIAS); if (!str) str = spa_dict_lookup(props, PW_KEY_PORT_NAME); if (!str) str = "port"; port_name = str; auto *node_obj = engine->findNode(node_id); uint32_t port_type = engine->otherPortType(); str = spa_dict_lookup(props, PW_KEY_FORMAT_DSP); if (str) { port_type = hashType(str); } else if (node_obj && (node_obj->node.node_type & NodeType::Video) != NodeType::None) { port_type = engine->videoPortType(); } PortMode port_mode = PortMode::None; str = spa_dict_lookup(props, PW_KEY_PORT_DIRECTION); if (str) { if (strcmp(str, "in") == 0) port_mode = PortMode::Input; else if (strcmp(str, "out") == 0) port_mode = PortMode::Output; } uint8_t port_flags = 0; if (node_obj && (node_obj->node.mode2 != PortMode::Duplex)) port_flags |= (uint8_t)PortFlags::Terminal; str = spa_dict_lookup(props, PW_KEY_PORT_PHYSICAL); if (str && pw_properties_parse_bool(str)) port_flags |= (uint8_t)PortFlags::Physical; str = spa_dict_lookup(props, PW_KEY_PORT_TERMINAL); if (str && pw_properties_parse_bool(str)) port_flags |= (uint8_t)PortFlags::Terminal; str = spa_dict_lookup(props, PW_KEY_PORT_MONITOR); if (str && pw_properties_parse_bool(str)) port_flags |= (uint8_t)PortFlags::Monitor; str = spa_dict_lookup(props, PW_KEY_PORT_CONTROL); if (str && pw_properties_parse_bool(str)) port_flags |= (uint8_t)PortFlags::Control; if (!node_obj) return; // Node not ready yet auto *pobj = new GraphEngine::PortObj(id); pobj->port.id = id; pobj->port.node_id = node_id; pobj->port.name = port_name; pobj->port.mode = port_mode; pobj->port.port_type = port_type; pobj->port.flags = PortFlags(port_flags); // Update node's mode2 if port direction differs if ((node_obj->node.mode2 & port_mode) == PortMode::None) node_obj->node.mode2 = PortMode::Duplex; // Avoid duplicate port IDs in node auto &pids = node_obj->node.port_ids; if (std::find(pids.begin(), pids.end(), id) == pids.end()) { pids.push_back(id); } node_obj->node.changed = true; engine->addObject(id, pobj); create_proxy_for_object(pobj, engine); engine->notifyChanged(); } else if (strcmp(type, PW_TYPE_INTERFACE_Link) == 0) { // Parse link properties (ported from qpwgraph lines 538-545) const char *str = spa_dict_lookup(props, PW_KEY_LINK_OUTPUT_PORT); const uint32_t port1_id = (str ? (uint32_t)pw_properties_parse_int(str) : 0); str = spa_dict_lookup(props, PW_KEY_LINK_INPUT_PORT); const uint32_t port2_id = (str ? (uint32_t)pw_properties_parse_int(str) : 0); // Validate auto *p1 = engine->findPort(port1_id); auto *p2 = engine->findPort(port2_id); if (!p1 || !p2) return; if ((p1->port.mode & PortMode::Output) == PortMode::None) return; if ((p2->port.mode & PortMode::Input) == PortMode::None) return; auto *lobj = new GraphEngine::LinkObj(id); lobj->link.id = id; lobj->link.port1_id = port1_id; lobj->link.port2_id = port2_id; p1->port.link_ids.push_back(id); engine->addObject(id, lobj); create_proxy_for_object(lobj, engine); engine->notifyChanged(); } } static void on_registry_global_remove(void *data, uint32_t id) { auto *engine = static_cast(data); engine->removeObject(id); engine->notifyChanged(); } static const struct pw_registry_events registry_events = { .version = PW_VERSION_REGISTRY_EVENTS, .global = on_registry_global, .global_remove = on_registry_global_remove, }; // ============================================================================ // GraphEngine::Object // ============================================================================ GraphEngine::Object::Object(uint32_t id, Type type) : id(id), type(type), proxy(nullptr), info(nullptr), destroy_info(nullptr), pending_seq(0), engine_ref(nullptr) { spa_zero(proxy_listener); spa_zero(object_listener); spa_list_init(&pending_link); } GraphEngine::Object::~Object() { if (proxy) { destroy_proxy(this); } } // ============================================================================ // GraphEngine // ============================================================================ GraphEngine::GraphEngine() : m_on_change(nullptr), m_running(false) { m_audio_type = hashType(DEFAULT_AUDIO_TYPE); m_midi_type = hashType(DEFAULT_MIDI_TYPE); m_midi2_type = hashType(DEFAULT_MIDI2_TYPE); m_video_type = hashType(DEFAULT_VIDEO_TYPE); m_other_type = hashType(DEFAULT_OTHER_TYPE); memset(&m_pw, 0, sizeof(m_pw)); spa_list_init(&m_pw.pending); } GraphEngine::~GraphEngine() { close(); } bool GraphEngine::open() { std::lock_guard lock(m_mutex); pw_init(nullptr, nullptr); memset(&m_pw, 0, sizeof(m_pw)); spa_list_init(&m_pw.pending); m_pw.pending_seq = 0; m_pw.loop = pw_thread_loop_new("pwweb_loop", nullptr); if (!m_pw.loop) { fprintf(stderr, "pw_thread_loop_new failed\n"); pw_deinit(); return false; } pw_thread_loop_lock(m_pw.loop); struct pw_loop *loop = pw_thread_loop_get_loop(m_pw.loop); m_pw.context = pw_context_new(loop, nullptr, 0); if (!m_pw.context) { fprintf(stderr, "pw_context_new failed\n"); pw_thread_loop_unlock(m_pw.loop); pw_thread_loop_destroy(m_pw.loop); pw_deinit(); return false; } m_pw.core = pw_context_connect(m_pw.context, nullptr, 0); if (!m_pw.core) { fprintf(stderr, "pw_context_connect failed\n"); pw_thread_loop_unlock(m_pw.loop); pw_context_destroy(m_pw.context); pw_thread_loop_destroy(m_pw.loop); pw_deinit(); return false; } pw_core_add_listener(m_pw.core, &m_pw.core_listener, &core_events, this); m_pw.registry = pw_core_get_registry(m_pw.core, PW_VERSION_REGISTRY, 0); pw_registry_add_listener(m_pw.registry, &m_pw.registry_listener, ®istry_events, this); m_pw.pending_seq = 0; m_pw.last_seq = 0; m_pw.last_res = 0; m_pw.error = false; m_running = true; pw_thread_loop_start(m_pw.loop); pw_thread_loop_unlock(m_pw.loop); fprintf(stderr, "pwweb: PipeWire connected\n"); return true; } void GraphEngine::close() { if (!m_pw.loop) return; m_running = false; pw_thread_loop_lock(m_pw.loop); clearObjects(); pw_thread_loop_unlock(m_pw.loop); if (m_pw.loop) pw_thread_loop_stop(m_pw.loop); if (m_pw.registry) { spa_hook_remove(&m_pw.registry_listener); pw_proxy_destroy((pw_proxy*)m_pw.registry); } if (m_pw.core) { spa_hook_remove(&m_pw.core_listener); pw_core_disconnect(m_pw.core); } if (m_pw.context) pw_context_destroy(m_pw.context); if (m_pw.loop) pw_thread_loop_destroy(m_pw.loop); memset(&m_pw, 0, sizeof(m_pw)); pw_deinit(); fprintf(stderr, "pwweb: PipeWire disconnected\n"); } void GraphEngine::setOnChange(ChangeCallback cb) { std::lock_guard lock(m_mutex); m_on_change = std::move(cb); } void GraphEngine::notifyChanged() { // Called from PipeWire thread — invoke callback outside lock ChangeCallback cb; { std::lock_guard lock(m_mutex); cb = m_on_change; } if (cb) cb(); } // ============================================================================ // Object management // ============================================================================ void GraphEngine::addObject(uint32_t id, Object *obj) { // Remove existing object with same ID if any (prevents duplicates) auto it = m_objects_by_id.find(id); if (it != m_objects_by_id.end()) { Object *old = it->second; auto vit = std::find(m_objects.begin(), m_objects.end(), old); if (vit != m_objects.end()) m_objects.erase(vit); delete old; } m_objects_by_id[id] = obj; m_objects.push_back(obj); } GraphEngine::Object *GraphEngine::findObject(uint32_t id) const { auto it = m_objects_by_id.find(id); return (it != m_objects_by_id.end()) ? it->second : nullptr; } void GraphEngine::removeObject(uint32_t id) { auto it = m_objects_by_id.find(id); if (it == m_objects_by_id.end()) return; Object *obj = it->second; m_objects_by_id.erase(it); auto vit = std::find(m_objects.begin(), m_objects.end(), obj); if (vit != m_objects.end()) m_objects.erase(vit); // If it's a port, remove from parent node's port list if (obj->type == Object::ObjPort) { auto *pobj = static_cast(obj); auto *nobj = findNode(pobj->port.node_id); if (nobj) { auto &pids = nobj->node.port_ids; pids.erase(std::remove(pids.begin(), pids.end(), id), pids.end()); nobj->node.changed = true; } // Remove any links involving this port auto link_ids_copy = pobj->port.link_ids; for (uint32_t lid : link_ids_copy) { removeObject(lid); } } // If it's a node, remove all its ports if (obj->type == Object::ObjNode) { auto *nobj = static_cast(obj); auto port_ids_copy = nobj->node.port_ids; for (uint32_t pid : port_ids_copy) { removeObject(pid); } } // If it's a link, remove from output port's link list if (obj->type == Object::ObjLink) { auto *lobj = static_cast(obj); auto *pobj = findPort(lobj->link.port1_id); if (pobj) { auto &lids = pobj->port.link_ids; lids.erase(std::remove(lids.begin(), lids.end(), id), lids.end()); } } delete obj; } void GraphEngine::clearObjects() { for (auto *obj : m_objects) delete obj; m_objects.clear(); m_objects_by_id.clear(); } GraphEngine::NodeObj *GraphEngine::findNode(uint32_t node_id) const { Object *obj = findObject(node_id); return (obj && obj->type == Object::ObjNode) ? static_cast(obj) : nullptr; } GraphEngine::PortObj *GraphEngine::findPort(uint32_t port_id) const { Object *obj = findObject(port_id); return (obj && obj->type == Object::ObjPort) ? static_cast(obj) : nullptr; } GraphEngine::LinkObj *GraphEngine::findLink(uint32_t link_id) const { Object *obj = findObject(link_id); return (obj && obj->type == Object::ObjLink) ? static_cast(obj) : nullptr; } // ============================================================================ // Snapshot (thread-safe graph state) // ============================================================================ GraphEngine::Snapshot GraphEngine::snapshot() const { std::lock_guard lock(m_mutex); Snapshot snap; for (auto *obj : m_objects) { switch (obj->type) { case Object::ObjNode: snap.nodes.push_back(static_cast(obj)->node); break; case Object::ObjPort: snap.ports.push_back(static_cast(obj)->port); break; case Object::ObjLink: snap.links.push_back(static_cast(obj)->link); break; } } return snap; } // ============================================================================ // Connect/Disconnect (ported from qpwgraph_pipewire::connectPorts) // ============================================================================ bool GraphEngine::connectPorts(uint32_t output_port_id, uint32_t input_port_id) { if (!m_pw.loop) return false; pw_thread_loop_lock(m_pw.loop); PortObj *p1 = findPort(output_port_id); PortObj *p2 = findPort(input_port_id); if (!p1 || !p2 || (p1->port.mode & PortMode::Output) == PortMode::None || (p2->port.mode & PortMode::Input) == PortMode::None || p1->port.port_type != p2->port.port_type) { pw_thread_loop_unlock(m_pw.loop); return false; } // Create link via PipeWire (adapted from qpwgraph lines 858-891) char val[4][16]; snprintf(val[0], sizeof(val[0]), "%u", p1->port.node_id); snprintf(val[1], sizeof(val[1]), "%u", p1->id); snprintf(val[2], sizeof(val[2]), "%u", p2->port.node_id); snprintf(val[3], sizeof(val[3]), "%u", p2->id); struct spa_dict props; struct spa_dict_item items[6]; props = SPA_DICT_INIT(items, 0); items[props.n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_LINK_OUTPUT_NODE, val[0]); items[props.n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_LINK_OUTPUT_PORT, val[1]); items[props.n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_LINK_INPUT_NODE, val[2]); items[props.n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_LINK_INPUT_PORT, val[3]); items[props.n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_OBJECT_LINGER, "true"); const char *str = getenv("PIPEWIRE_LINK_PASSIVE"); if (str && pw_properties_parse_bool(str)) items[props.n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_LINK_PASSIVE, "true"); pw_proxy *proxy = (pw_proxy*)pw_core_create_object(m_pw.core, "link-factory", PW_TYPE_INTERFACE_Link, PW_VERSION_LINK, &props, 0); if (proxy) { int link_res = 0; spa_hook listener; spa_zero(listener); pw_proxy_add_listener(proxy, &listener, &link_proxy_events, &link_res); link_proxy_sync(this); spa_hook_remove(&listener); pw_proxy_destroy(proxy); } pw_thread_loop_unlock(m_pw.loop); return (proxy != nullptr); } bool GraphEngine::disconnectPorts(uint32_t output_port_id, uint32_t input_port_id) { if (!m_pw.loop) return false; pw_thread_loop_lock(m_pw.loop); PortObj *p1 = findPort(output_port_id); PortObj *p2 = findPort(input_port_id); if (!p1 || !p2) { pw_thread_loop_unlock(m_pw.loop); return false; } // Find and destroy matching link (adapted from qpwgraph lines 846-853) bool found = false; for (uint32_t lid : p1->port.link_ids) { LinkObj *link = findLink(lid); if (link && link->link.port1_id == output_port_id && link->link.port2_id == input_port_id) { pw_registry_destroy(m_pw.registry, lid); link_proxy_sync(this); found = true; break; } } pw_thread_loop_unlock(m_pw.loop); return found; } // ============================================================================ // Volume control // ============================================================================ bool GraphEngine::setNodeVolume(uint32_t node_id, float volume) { if (!m_pw.loop) return false; pw_thread_loop_lock(m_pw.loop); NodeObj *nobj = findNode(node_id); if (!nobj || !nobj->proxy) { fprintf(stderr, "pwweb: setNodeVolume: node %u not found or no proxy\n", node_id); pw_thread_loop_unlock(m_pw.loop); return false; } // Build Props param with both SPA_PROP_volume and SPA_PROP_channelVolumes. // Hardware ALSA sinks respond to SPA_PROP_volume; browser/app streams use // SPA_PROP_channelVolumes (per-channel). Setting both covers all node types. uint32_t n_ch = (nobj->node.channels > 0 && nobj->node.channels <= 32) ? nobj->node.channels : 2; float ch_vols[32]; for (uint32_t i = 0; i < n_ch; i++) ch_vols[i] = volume; uint8_t buf[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf)); struct spa_pod_frame f; spa_pod_builder_push_object(&b, &f, SPA_TYPE_OBJECT_Props, SPA_PARAM_Props); spa_pod_builder_prop(&b, SPA_PROP_volume, 0); spa_pod_builder_float(&b, volume); spa_pod_builder_prop(&b, SPA_PROP_channelVolumes, 0); spa_pod_builder_array(&b, sizeof(float), SPA_TYPE_Float, n_ch, ch_vols); struct spa_pod *param = (struct spa_pod*)spa_pod_builder_pop(&b, &f); int res = pw_node_set_param((pw_node*)nobj->proxy, SPA_PARAM_Props, 0, param); fprintf(stderr, "pwweb: setNodeVolume node=%u vol=%.2f res=%d name=%s\n", node_id, volume, res, nobj->node.name.c_str()); nobj->node.volume = volume; nobj->node.changed = true; pw_thread_loop_unlock(m_pw.loop); return (res >= 0); } bool GraphEngine::setNodeMute(uint32_t node_id, bool mute) { if (!m_pw.loop) return false; pw_thread_loop_lock(m_pw.loop); NodeObj *nobj = findNode(node_id); if (!nobj || !nobj->proxy) { fprintf(stderr, "pwweb: setNodeMute: node %u not found or no proxy\n", node_id); pw_thread_loop_unlock(m_pw.loop); return false; } uint8_t buf[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf)); struct spa_pod_frame f; spa_pod_builder_push_object(&b, &f, SPA_TYPE_OBJECT_Props, SPA_PARAM_Props); spa_pod_builder_prop(&b, SPA_PROP_mute, 0); spa_pod_builder_bool(&b, mute); struct spa_pod *param = (struct spa_pod*)spa_pod_builder_pop(&b, &f); int res = pw_node_set_param((pw_node*)nobj->proxy, SPA_PARAM_Props, 0, param); fprintf(stderr, "pwweb: setNodeMute node=%u mute=%d res=%d name=%s\n", node_id, mute, res, nobj->node.name.c_str()); nobj->node.mute = mute; nobj->node.changed = true; pw_thread_loop_unlock(m_pw.loop); return (res >= 0); } // ============================================================================ // Module loading (virtual devices) // ============================================================================ uint32_t GraphEngine::loadModule(const char *name, const char *args) { if (!m_pw.loop || !m_pw.context) return 0; pw_thread_loop_lock(m_pw.loop); struct pw_impl_module *mod = pw_context_load_module(m_pw.context, name, args, nullptr); uint32_t id = 0; if (!mod) { fprintf(stderr, "pwweb: failed to load module %s\n", name); } else { struct pw_global *global = pw_impl_module_get_global(mod); if (global) { id = pw_global_get_id(global); } fprintf(stderr, "pwweb: loaded module %s (id=%u)\n", name, id); } pw_thread_loop_unlock(m_pw.loop); return id; } bool GraphEngine::unloadModule(uint32_t module_id) { if (!m_pw.loop || !m_pw.registry) return false; pw_thread_loop_lock(m_pw.loop); pw_registry_destroy(m_pw.registry, module_id); pw_thread_loop_unlock(m_pw.loop); fprintf(stderr, "pwweb: unloaded module id=%u\n", module_id); return true; } // end of graph_engine.cpp