Source code for tg_model.execution.graph_compiler

"""Compile a DependencyGraph from a ConfiguredModel's authored semantics.

Walks the configured topology, inspects authored expressions and constraints,
and builds the bipartite dependency graph automatically.

Edges are kept so compute nodes depend on **value** nodes (with ``slot_id``);
``Evaluator._check_dependencies_ready`` only inspects those value dependencies.
The only public entry point is ``compile_graph()``. All other functions
are internal compilation helpers.
"""

from __future__ import annotations

from collections.abc import Callable
from typing import Any

from unitflow import Quantity

from tg_model.execution.configured_model import ConfiguredModel
from tg_model.execution.dependency_graph import DependencyGraph, DependencyNode, NodeKind
from tg_model.execution.external_ops import (
    ExternalOpsError,
    materialize_external_result,
)
from tg_model.execution.external_ops import (
    resolve_attribute_ref_to_slot as _resolve_attr_ref_core,
)
from tg_model.execution.instances import ElementInstance, PartInstance, RequirementPackageInstance
from tg_model.execution.value_slots import ValueSlot
from tg_model.integrations.external_compute import (
    ExternalComputeBinding,
    ExternalComputeResult,
    assert_sync_external,
)
from tg_model.model.refs import AttributeRef


def _alloc_target_as_part(target: ElementInstance, *, where: str) -> PartInstance:
    """Narrow allocation targets to :class:`PartInstance` with a single graph-level check."""
    if not isinstance(target, PartInstance):
        raise GraphCompilationError(f"{where}: allocation target must be PartInstance, got {type(target).__name__}")
    return target


def _first_value_slot_under_requirement_package(
    pkg: RequirementPackageInstance,
) -> ValueSlot | None:
    """First package :class:`ValueSlot` in stable name order (including nested packages)."""
    for key in sorted(pkg._members.keys()):
        m = pkg._members[key]
        if isinstance(m, ValueSlot):
            return m
        if isinstance(m, RequirementPackageInstance):
            inner = _first_value_slot_under_requirement_package(m)
            if inner is not None:
                return inner
    return None


[docs] class GraphCompilationError(Exception): """Raised when graph compilation cannot resolve symbols, slots, or bindings."""
[docs] def compile_graph(model: ConfiguredModel) -> tuple[DependencyGraph, dict[str, Callable]]: """Compile dependency graph and per-node compute handlers from a configured model. This is the **only** supported public entry point for graph compilation. Parameters ---------- model : ConfiguredModel Frozen topology from :func:`~tg_model.execution.configured_model.instantiate`. Returns ------- graph : DependencyGraph Bipartite value/compute graph in topological-evaluable form. handlers : dict[str, Callable] Sync callables keyed by compute ``node_id`` (expressions, roll-ups, externals, constraints). Raises ------ GraphCompilationError On unresolvable references, binding errors, or other compile failures. Notes ----- Walks value slots, requirement acceptance, constraints, solve groups, and external nodes. Async externals are still scheduled from sync :meth:`~tg_model.execution.evaluator.Evaluator.evaluate_async`. Successful results are **cached** on ``model._compiled_graph`` so repeated calls and :meth:`~tg_model.execution.configured_model.ConfiguredModel.evaluate` reuse the same ``(graph, handlers)`` tuple without recompilation. """ cached = getattr(model, "_compiled_graph", None) if cached is not None: return cached graph = DependencyGraph() handlers: dict[str, Callable] = {} param_overrides_by_pkg = _build_param_overrides_by_pkg(model) allocation_target_by_pkg = _build_allocation_target_by_pkg(model) _compile_part(model.root, graph, handlers, model) _compile_requirement_packages_from_parts( model.root, graph, handlers, model, param_overrides_by_pkg, allocation_target_by_pkg ) _compile_constraints_for_part(model.root, graph, handlers, model) _compile_solve_groups_for_part(model.root, graph, handlers, model) model._compiled_graph = (graph, handlers) return graph, handlers
def _compile_part( part: PartInstance, graph: DependencyGraph, handlers: dict[str, Callable], model: ConfiguredModel, ) -> None: for slot in part.value_slots: _compile_slot(slot, part, graph, handlers, model) _compile_external_for_part(part, graph, handlers, model) for child in part.children: _compile_part(child, graph, handlers, model) def _compile_slot( slot: ValueSlot, owner: PartInstance, graph: DependencyGraph, handlers: dict[str, Callable], model: ConfiguredModel, ) -> None: slot_node_id = f"val:{slot.path_string}" if slot.is_parameter: graph.add_node( DependencyNode( slot_node_id, NodeKind.INPUT_PARAMETER, slot_id=slot.stable_id, ) ) return expr = slot.metadata.get("_expr") if expr is None: graph.add_node( DependencyNode( slot_node_id, NodeKind.ATTRIBUTE_VALUE, slot_id=slot.stable_id, ) ) return from tg_model.model.declarations.values import RollupDecl if isinstance(expr, RollupDecl): _compile_rollup(slot, slot_node_id, expr, owner, graph, handlers) return graph.add_node( DependencyNode( slot_node_id, NodeKind.ATTRIBUTE_VALUE, slot_id=slot.stable_id, ) ) expr_node_id = f"expr:{slot.path_string}" graph.add_node( DependencyNode( expr_node_id, NodeKind.LOCAL_EXPRESSION, slot_id=slot.stable_id, ) ) graph.add_edge(expr_node_id, slot_node_id) if isinstance(expr, AttributeRef): dep_slot = _resolve_attribute_ref_to_slot(expr, owner, model) dep_node_id = f"val:{dep_slot.path_string}" graph.add_edge(dep_node_id, expr_node_id) def make_ref_passthrough_handler(dnid: str) -> Callable[..., Any]: def handler(dep_values: dict[str, Any]) -> Any: return dep_values[dnid] return handler handlers[expr_node_id] = make_ref_passthrough_handler(dep_node_id) return if hasattr(expr, "free_symbols") and expr.free_symbols: for sym in expr.free_symbols: dep_slot = _resolve_symbol_to_slot(sym, owner, model) dep_node_id = f"val:{dep_slot.path_string}" graph.add_edge(dep_node_id, expr_node_id) def make_expr_handler(expression: Any, owner_part: PartInstance, cm: ConfiguredModel) -> Callable: def handler(dep_values: dict[str, Any]) -> Any: context = {} for sym in expression.free_symbols: dep_slot = _resolve_symbol_to_slot(sym, owner_part, cm) dep_node_id = f"val:{dep_slot.path_string}" if dep_node_id in dep_values: context[sym] = dep_values[dep_node_id] return expression.evaluate(context) return handler handlers[expr_node_id] = make_expr_handler(expr, owner, model) elif hasattr(expr, "evaluate"): handlers[expr_node_id] = lambda dep_values, e=expr: e.evaluate({}) elif callable(expr): handlers[expr_node_id] = lambda dep_values, fn=expr: fn(dep_values) else: handlers[expr_node_id] = lambda dep_values, val=expr: val def _resolve_attribute_ref_to_slot( ref: Any, owner: PartInstance, model: ConfiguredModel, ) -> ValueSlot: try: return _resolve_attr_ref_core(ref, owner, model) except ExternalOpsError as e: raise GraphCompilationError(str(e)) from e def _compile_external_for_part( part: PartInstance, graph: DependencyGraph, handlers: dict[str, Callable], model: ConfiguredModel, ) -> None: groups: dict[int, list[ValueSlot]] = {} for slot in part.value_slots: cb = slot.metadata.get("_computed_by") if cb is None: continue if not isinstance(cb, ExternalComputeBinding): raise GraphCompilationError( f"computed_by must be an ExternalComputeBinding at '{slot.path_string}', got {type(cb).__name__}" ) if slot.metadata.get("_expr") is not None: raise GraphCompilationError(f"Attribute '{slot.path_string}' cannot combine expr= with computed_by=") groups.setdefault(id(cb), []).append(slot) for slots in groups.values(): _build_external_compute_node(slots, part, graph, handlers, model) def _build_external_compute_node( slots: list[ValueSlot], owner: PartInstance, graph: DependencyGraph, handlers: dict[str, Callable], model: ConfiguredModel, ) -> None: binding: ExternalComputeBinding = slots[0].metadata["_computed_by"] for s in slots[1:]: if s.metadata.get("_computed_by") is not binding: raise GraphCompilationError("External binding identity mismatch within compute group") routes = binding.output_routes slot_ids = {s.stable_id for s in slots} if routes is None: if len(slots) != 1: raise GraphCompilationError( "Single-slot ExternalComputeBinding (output_routes is None) requires exactly " "one attribute with that binding on this part" ) output_slot_ids = [slots[0].stable_id] else: resolved_by_key: dict[str, ValueSlot] = {} for key, ref in routes.items(): resolved_by_key[key] = _resolve_attribute_ref_to_slot(ref, owner, model) route_ids = {vs.stable_id for vs in resolved_by_key.values()} if slot_ids != route_ids: raise GraphCompilationError( f"external output_routes target slots {route_ids!r} must match exactly " f"attributes carrying the same computed_by ({slot_ids!r}) " f"(part '{owner.path_string}')" ) output_slot_ids = [resolved_by_key[k].stable_id for k in sorted(routes.keys())] node_id = f"ext:{id(binding)}:{owner.path_string}" if node_id in graph.nodes: raise GraphCompilationError(f"Duplicate external compute node '{node_id}'") input_name_to_dep: dict[str, str] = {} for _iname, ref in binding.inputs.items(): in_slot = _resolve_attribute_ref_to_slot(ref, owner, model) dep_node = f"val:{in_slot.path_string}" input_name_to_dep[_iname] = dep_node # Live binding reference: treat as frozen after compile_graph(); mutating it afterward is UB. graph.add_node( DependencyNode( node_id, NodeKind.EXTERNAL_COMPUTATION, metadata={ "output_slot_ids": tuple(output_slot_ids), "binding_id": id(binding), "binding": binding, "owner_path": tuple(owner.instance_path), "input_name_to_dep": dict(input_name_to_dep), }, ) ) for _dep in input_name_to_dep.values(): graph.add_edge(_dep, node_id) for s in slots: graph.add_edge(node_id, f"val:{s.path_string}") handlers[node_id] = _make_external_handler( binding=binding, owner=owner, model=model, input_name_to_dep=input_name_to_dep, slots=slots, node_id=node_id, ) def _make_external_handler( *, binding: ExternalComputeBinding, owner: PartInstance, model: ConfiguredModel, input_name_to_dep: dict[str, str], slots: list[ValueSlot], node_id: str, ) -> Callable[..., None]: from tg_model.execution.evaluator import RunResult from tg_model.execution.run_context import RunContext def handler(dep_values: dict[str, Any], ctx: RunContext, run_result: RunResult) -> None: try: assert_sync_external(binding.external, context=node_id) except TypeError as e: msg = str(e) for s in slots: ctx.get_or_create_record(s.stable_id).block(msg) run_result.failures.append(msg) return inputs_dict: dict[str, Quantity] = {} try: for name, dep_node_id in input_name_to_dep.items(): if dep_node_id not in dep_values: raise KeyError(f"missing dependency {dep_node_id}") inputs_dict[name] = dep_values[dep_node_id] compute_fn = getattr(binding.external, "compute", None) if compute_fn is None: raise TypeError("external object has no compute()") res = compute_fn(inputs_dict) if not isinstance(res, ExternalComputeResult): raise TypeError(f"External compute must return ExternalComputeResult, got {type(res).__name__}") materialize_external_result(binding, res, owner, model, ctx, slots) except Exception as e: msg = str(e) for s in slots: ctx.get_or_create_record(s.stable_id).fail(msg) run_result.failures.append(f"External compute '{node_id}' failed: {msg}") return handler def _compile_rollup( slot: ValueSlot, slot_node_id: str, expr: Any, owner: PartInstance, graph: DependencyGraph, handlers: dict[str, Callable], ) -> None: graph.add_node( DependencyNode( slot_node_id, NodeKind.ATTRIBUTE_VALUE, slot_id=slot.stable_id, ) ) expr_node_id = f"rollup:{slot.path_string}" graph.add_node( DependencyNode( expr_node_id, NodeKind.ROLLUP_COMPUTATION, slot_id=slot.stable_id, ) ) graph.add_edge(expr_node_id, slot_node_id) child_slots: list[str] = [] for child in owner.children: try: target_slot = expr.value_func(child) if isinstance(target_slot, ValueSlot): dep_node_id = f"val:{target_slot.path_string}" graph.add_edge(dep_node_id, expr_node_id) child_slots.append(dep_node_id) except AttributeError: pass from tg_model.execution.rollups import build_rollup_handler handlers[expr_node_id] = build_rollup_handler(expr.kind, expr.value_func, child_slots) def _build_param_overrides_by_pkg(model: ConfiguredModel) -> dict[str, dict[str, ValueSlot]]: """Build a lookup: requirement package path_string → {param_name → source ValueSlot}. Used by :func:`_compile_requirement_package_tree` to wire requirement package parameters as computed values (sourced from the part side) when ``allocate(..., inputs=...)`` was used. """ overrides: dict[str, dict[str, ValueSlot]] = {} for alloc in model.allocations: if not alloc.parameter_overrides: continue overrides[alloc.requirement.path_string] = alloc.parameter_overrides return overrides def _build_allocation_target_by_pkg(model: ConfiguredModel) -> dict[str, str]: """Build a lookup: requirement package path_string → allocation target path_string. Used to tag requirement constraint check nodes with the part they were allocated to. Raises :class:`GraphCompilationError` if any allocation targets a non-part element (e.g. a port). """ targets: dict[str, str] = {} for alloc in model.allocations: part = _alloc_target_as_part( alloc.target, where=f"allocate({alloc.requirement.path_string!r}, ...)", ) targets[alloc.requirement.path_string] = part.path_string return targets def _resolve_symbol_to_slot( sym: Any, owner: PartInstance, model: ConfiguredModel, ) -> ValueSlot: """Resolve a unitflow Symbol to its corresponding ValueSlot. Uses the canonical symbol-id registry from AttributeRef.sym. When the symbol belongs to a different type than ``owner`` (e.g. a root system parameter referenced via :func:`~tg_model.model.definition_context.parameter_ref`), resolution falls back to ``model.root``. Fails loudly if the symbol cannot be resolved — silent misbinding is not acceptable in a safety-critical context. """ from tg_model.model.refs import _symbol_id_to_path result = _symbol_id_to_path.get(id(sym)) if result is not None: _owner_type, tg_path = result current: Any = owner try: for segment in tg_path: current = getattr(current, segment) if isinstance(current, ValueSlot): return current except AttributeError: pass if _owner_type is not owner.definition_type and _owner_type is model.root.definition_type: current = model.root try: for segment in tg_path: current = getattr(current, segment) if isinstance(current, ValueSlot): return current except AttributeError: pass raise GraphCompilationError( f"Symbol '{getattr(sym, 'name', '?')}' has registered path {tg_path} " f"but could not be resolved under '{owner.path_string}'" ) raise GraphCompilationError( f"Symbol '{getattr(sym, 'name', '?')}' is not a canonical AttributeRef-derived symbol. " f"All expression symbols must originate from model.attribute() or model.parameter() refs." ) def _compile_requirement_packages_from_parts( part: PartInstance, graph: DependencyGraph, handlers: dict[str, Callable], model: ConfiguredModel, param_overrides_by_pkg: dict[str, dict[str, ValueSlot]], allocation_target_by_pkg: dict[str, str], ) -> None: """Compile value slots and constraints declared on composable requirement packages.""" compiled = part.definition_type.compile() tr = compiled.get("_type_registry", {}) for name, node in compiled.get("nodes", {}).items(): if node.get("kind") != "requirement_block" or tr.get(name) is None: continue sub = getattr(part, name, None) if isinstance(sub, RequirementPackageInstance): _compile_requirement_package_tree( sub, graph, handlers, model, param_overrides_by_pkg, allocation_target_by_pkg ) for child in part.children: _compile_requirement_packages_from_parts( child, graph, handlers, model, param_overrides_by_pkg, allocation_target_by_pkg ) def _compile_parameter_override( slot: ValueSlot, source_slot: ValueSlot, graph: DependencyGraph, handlers: dict[str, Callable], ) -> None: """Wire a requirement package parameter slot to pull its value from ``source_slot``. This replaces the ``INPUT_PARAMETER`` graph node with a simple pass-through expression so the evaluator computes the requirement parameter from the part's slot value rather than requiring it as an external input. """ slot_node_id = f"val:{slot.path_string}" source_node_id = f"val:{source_slot.path_string}" expr_node_id = f"expr:{slot.path_string}" graph.add_node(DependencyNode(slot_node_id, NodeKind.ATTRIBUTE_VALUE, slot_id=slot.stable_id)) graph.add_node(DependencyNode(expr_node_id, NodeKind.LOCAL_EXPRESSION, slot_id=slot.stable_id)) graph.add_edge(source_node_id, expr_node_id) graph.add_edge(expr_node_id, slot_node_id) def handler(dep_values: dict[str, Any], _sid: str = source_node_id) -> Any: return dep_values[_sid] handlers[expr_node_id] = handler def _compile_requirement_package_tree( pkg: RequirementPackageInstance, graph: DependencyGraph, handlers: dict[str, Callable], model: ConfiguredModel, param_overrides_by_pkg: dict[str, dict[str, ValueSlot]], allocation_target_by_pkg: dict[str, str], parent_target_path: str = "", ) -> None: from tg_model.model.declarations.values import RollupDecl compiled = pkg.package_type.compile() overrides = param_overrides_by_pkg.get(pkg.path_string, {}) # Resolve the allocation target: own entry takes priority, then inherit from parent. target_path = allocation_target_by_pkg.get(pkg.path_string, parent_target_path) # Declaration order matches Python 3.7+ dict insertion order (same as compile_type recording). for name, node in compiled["nodes"].items(): kind = node["kind"] if kind in ("parameter", "attribute"): slot = getattr(pkg, name) if not isinstance(slot, ValueSlot): raise GraphCompilationError( f"Expected ValueSlot for {pkg.path_string}.{name}, got {type(slot).__name__}" ) if slot.metadata.get("_computed_by") is not None: raise GraphCompilationError( f"computed_by= is not supported on requirement package slot '{slot.path_string}'" ) expr_m = slot.metadata.get("_expr") if isinstance(expr_m, RollupDecl): raise GraphCompilationError( f"RollupDecl is not supported on requirement package slot '{slot.path_string}'" ) if kind == "parameter" and name in overrides: _compile_parameter_override(slot, overrides[name], graph, handlers) else: _compile_slot(slot, model.root, graph, handlers, model) elif kind == "constraint": _compile_requirement_package_constraint( pkg, name, node, graph, handlers, model, target_path ) elif kind == "requirement_block": inner = getattr(pkg, name, None) if isinstance(inner, RequirementPackageInstance): _compile_requirement_package_tree( inner, graph, handlers, model, param_overrides_by_pkg, allocation_target_by_pkg, target_path, ) def _compile_requirement_package_constraint( pkg: RequirementPackageInstance, name: str, node: dict[str, Any], graph: DependencyGraph, handlers: dict[str, Callable], model: ConfiguredModel, allocation_target_path: str = "", ) -> None: constraint_node_id = f"check:{pkg.path_string}.{name}" graph.add_node( DependencyNode( constraint_node_id, NodeKind.CONSTRAINT_CHECK, metadata={ "name": f"{pkg.path_string}.{name}", "requirement_path": pkg.path_string, "allocation_target_path": allocation_target_path, }, ) ) expr = node.get("metadata", {}).get("_expr") if expr is None: raise GraphCompilationError( f"Requirement package constraint '{pkg.path_string}.{name}' has no expr " f"(expected compile-time validation to reject this)." ) def make_pkg_constraint_handler( constraint_expr: Any, cm: ConfiguredModel, ) -> Callable[..., bool]: def handler(dep_values: dict[str, Any]) -> bool: context: dict[Any, Any] = {} for sym in constraint_expr.free_symbols: dep_slot = _resolve_symbol_to_slot(sym, cm.root, cm) dep_node_id = f"val:{dep_slot.path_string}" if dep_node_id in dep_values: context[sym] = dep_values[dep_node_id] return constraint_expr.evaluate(context) return handler if hasattr(expr, "free_symbols") and expr.free_symbols: for sym in expr.free_symbols: dep_slot = _resolve_symbol_to_slot(sym, model.root, model) dep_node_id = f"val:{dep_slot.path_string}" graph.add_edge(dep_node_id, constraint_node_id) handlers[constraint_node_id] = make_pkg_constraint_handler(expr, model) elif hasattr(expr, "evaluate"): handlers[constraint_node_id] = lambda dep_values, e=expr: bool(e.evaluate({})) elif callable(expr): handlers[constraint_node_id] = lambda dep_values, fn=expr: bool(fn(dep_values)) else: handlers[constraint_node_id] = lambda dep_values, val=expr: bool(val) # Constant / symbol-free constraints need at least one incoming edge so validation does not # treat the check node as orphaned, and so evaluation runs after package inputs exist. if not graph.dependencies_of(constraint_node_id): anchor = _first_value_slot_under_requirement_package(pkg) if anchor is not None: graph.add_edge(f"val:{anchor.path_string}", constraint_node_id) def _compile_constraints_for_part( part: PartInstance, graph: DependencyGraph, handlers: dict[str, Callable], model: ConfiguredModel, ) -> None: compiled = part.definition_type.compile() for name, node in compiled.get("nodes", {}).items(): if node["kind"] != "constraint": continue constraint_node_id = f"check:{part.path_string}.{name}" graph.add_node( DependencyNode( constraint_node_id, NodeKind.CONSTRAINT_CHECK, metadata={"name": f"{part.path_string}.{name}"}, ) ) expr = node["metadata"].get("_expr") if expr is not None and hasattr(expr, "free_symbols"): for sym in expr.free_symbols: dep_slot = _resolve_symbol_to_slot(sym, part, model) dep_node_id = f"val:{dep_slot.path_string}" graph.add_edge(dep_node_id, constraint_node_id) def make_constraint_handler( constraint_expr: Any, owner_part: PartInstance, cm: ConfiguredModel, ) -> Callable[..., bool]: def handler(dep_values: dict[str, Any]) -> bool: context = {} for sym in constraint_expr.free_symbols: dep_slot = _resolve_symbol_to_slot(sym, owner_part, cm) dep_node_id = f"val:{dep_slot.path_string}" if dep_node_id in dep_values: context[sym] = dep_values[dep_node_id] return constraint_expr.evaluate(context) return handler handlers[constraint_node_id] = make_constraint_handler(expr, part, model) for child in part.children: _compile_constraints_for_part(child, graph, handlers, model) def _resolve_path_to_slot( path: list[str], part: PartInstance, group_name: str, ) -> ValueSlot: """Resolve a declaration path to its corresponding ValueSlot under a part.""" current: Any = part try: for seg in path: current = getattr(current, seg) if isinstance(current, ValueSlot): return current except AttributeError: pass raise GraphCompilationError( f"Solve group '{group_name}': path {list(path)} could not be resolved to a ValueSlot under '{part.path_string}'" ) def _compile_solve_groups_for_part( part: PartInstance, graph: DependencyGraph, handlers: dict[str, Callable], model: ConfiguredModel, ) -> None: compiled = part.definition_type.compile() for name, node in compiled.get("nodes", {}).items(): if node["kind"] != "solve_group": continue sg_node_id = f"solve:{part.path_string}.{name}" meta = node.get("metadata", {}) unknown_paths = meta.get("_unknowns", []) given_paths = meta.get("_givens", []) equations = meta.get("_equations", []) unknown_slot_ids: set[str] = set() unknown_slot_by_path: dict[tuple[str, ...], str] = {} for upath in unknown_paths: slot = _resolve_path_to_slot(upath, part, name) if slot.stable_id in unknown_slot_ids: raise GraphCompilationError(f"Solve group '{name}': duplicate unknown '{'.'.join(upath)}'") unknown_slot_ids.add(slot.stable_id) unknown_slot_by_path[tuple(upath)] = slot.stable_id given_slot_ids: set[str] = set() for gpath in given_paths: slot = _resolve_path_to_slot(gpath, part, name) if slot.stable_id in given_slot_ids: raise GraphCompilationError(f"Solve group '{name}': duplicate given '{'.'.join(gpath)}'") if slot.stable_id in unknown_slot_ids: raise GraphCompilationError( f"Solve group '{name}': '{'.'.join(gpath)}' declared as both unknown and given" ) given_slot_ids.add(slot.stable_id) target_slots = {sid: sid for sid in unknown_slot_ids} graph.add_node( DependencyNode( sg_node_id, NodeKind.SOLVE_GROUP, metadata={"name": name, "target_slots": target_slots}, ) ) unknown_syms: list[Any] = [] given_syms: list[Any] = [] given_to_node_id: dict[Any, str] = {} found_unknown_ids: set[str] = set() found_given_ids: set[str] = set() for eq in equations: if not hasattr(eq, "free_symbols"): continue for sym in eq.free_symbols: if any(s is sym for s in unknown_syms) or any(s is sym for s in given_syms): continue slot = _resolve_symbol_to_slot(sym, part, model) dep_node_id = f"val:{slot.path_string}" if slot.stable_id in unknown_slot_ids: unknown_syms.append(sym) found_unknown_ids.add(slot.stable_id) graph.add_edge(sg_node_id, dep_node_id) elif slot.stable_id in given_slot_ids: given_syms.append(sym) found_given_ids.add(slot.stable_id) given_to_node_id[sym] = dep_node_id graph.add_edge(dep_node_id, sg_node_id) else: raise GraphCompilationError( f"Solve group '{name}': symbol '{getattr(sym, 'name', '?')}' " f"resolves to slot '{slot.path_string}' which is not declared " f"as unknown or given." ) missing_unknowns = unknown_slot_ids - found_unknown_ids if missing_unknowns: raise GraphCompilationError( f"Solve group '{name}': declared unknowns not found in any equation. " f"Missing slot IDs: {missing_unknowns}" ) missing_givens = given_slot_ids - found_given_ids if missing_givens: raise GraphCompilationError( f"Solve group '{name}': declared givens not found in any equation. Missing slot IDs: {missing_givens}" ) sym_to_slot_id: dict[int, str] = {} for sym in unknown_syms: from tg_model.model.refs import _symbol_id_to_path result = _symbol_id_to_path.get(id(sym)) if result is not None: _, sym_path = result slot_id = unknown_slot_by_path.get(tuple(sym_path)) if slot_id is not None: sym_to_slot_id[id(sym)] = slot_id from tg_model.execution.solve_groups import build_solve_group_handler handlers[sg_node_id] = build_solve_group_handler( equations, unknown_syms, given_syms, given_to_node_id, sym_to_slot_id, ) for child in part.children: _compile_solve_groups_for_part(child, graph, handlers, model)