"""Discrete behavioral execution (Phase 6) on the ``RunContext`` spine.
Includes: state machines (:func:`dispatch_event`), activity control-flow
(:func:`dispatch_sequence`, :func:`dispatch_decision`, :func:`dispatch_merge`,
:func:`dispatch_fork_join`), and inter-part :func:`emit_item` with traces and scenario
validation.
**Guards and effects** both run under the same **subtree** scope on
:class:`~tg_model.execution.run_context.RunContext` (see that class for limits: API
discipline, not a sandbox).
**Transition commit order:** when a transition fires, the active state is updated to the
*target* state **before** the transition's action effect runs. Effects observe the
post-transition state via :meth:`~tg_model.execution.run_context.RunContext.get_active_behavior_state`.
Guards run **before** any state change.
**Effect errors:** if an effect callable raises, the active state is reverted to the
pre-transition state and the exception propagates (no :class:`BehaviorStep` is recorded).
**Fork/join (v0):** :func:`dispatch_fork_join` runs branch actions **serially** in a fixed
order (deterministic); it does not interleave or schedule parallel threads.
"""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
from tg_model.execution.instances import PartInstance, PortInstance
from tg_model.execution.run_context import RunContext
[docs]
class DispatchOutcome(StrEnum):
"""Outcome of :func:`dispatch_event` (transition fired vs skipped)."""
FIRED = "fired"
NO_MATCH = "no_match"
"""No transition for the current state and event name (or no behavior spec)."""
GUARD_FAILED = "guard_failed"
"""A transition matched but its ``when`` guard returned false."""
[docs]
@dataclass(frozen=True)
class DispatchResult:
"""Structured result of :func:`dispatch_event`.
Notes
-----
``bool(result)`` is true only when a transition fired (legacy truthiness preserved).
"""
outcome: DispatchOutcome
def __bool__(self) -> bool:
return self.outcome == DispatchOutcome.FIRED
[docs]
class DecisionDispatchOutcome(StrEnum):
"""Outcome of :func:`dispatch_decision` (action ran vs not)."""
ACTION_RAN = "action_ran"
"""A branch or ``default_action`` ran (name in :attr:`DecisionDispatchResult.chosen_action`)."""
NO_ACTION = "no_action"
"""No branch matched and there was no ``default_action``."""
[docs]
@dataclass(frozen=True)
class DecisionDispatchResult:
"""Structured result of :func:`dispatch_decision`.
Notes
-----
``bool(result)`` is true when ``chosen_action`` is not ``None``.
"""
outcome: DecisionDispatchOutcome
chosen_action: str | None = None
merge_ran: bool = False
"""True when a paired merge was executed (after a chosen action)."""
def __bool__(self) -> bool:
return self.chosen_action is not None
[docs]
@dataclass(frozen=True)
class BehaviorStep:
"""One state-machine transition recorded in :class:`BehaviorTrace`."""
step_index: int
part_path: str
event_name: str
from_state: str
to_state: str
effect_name: str | None
[docs]
@dataclass(frozen=True)
class ItemFlowStep:
"""Inter-part item flow across one :class:`~tg_model.execution.connection_bindings.ConnectionBinding`."""
step_index: int
source_port_path: str
target_port_path: str
item_kind: str
payload: Any | None = None
[docs]
@dataclass(frozen=True)
class DecisionTraceStep:
"""Record of one :func:`dispatch_decision` invocation."""
step_index: int
part_path: str
decision_name: str
chosen_action: str | None
[docs]
@dataclass(frozen=True)
class ForkJoinTraceStep:
"""Record of one :func:`dispatch_fork_join` invocation."""
step_index: int
part_path: str
block_name: str
[docs]
@dataclass(frozen=True)
class MergeTraceStep:
"""Record of one :func:`dispatch_merge` invocation."""
step_index: int
part_path: str
merge_name: str
then_action: str | None
[docs]
@dataclass(frozen=True)
class SequenceTraceStep:
"""Record of one :func:`dispatch_sequence` invocation."""
step_index: int
part_path: str
sequence_name: str
[docs]
@dataclass
class BehaviorTrace:
"""Mutable collector for behavioral steps (multiple parallel lists).
Notes
-----
Paths are :attr:`PartInstance.path_string` values and declared **names**, not slot stable ids.
Global ordering uses ``step_index`` across lists (see :func:`behavior_trace_to_records`).
"""
steps: list[BehaviorStep] = field(default_factory=list)
item_flows: list[ItemFlowStep] = field(default_factory=list)
decision_steps: list[DecisionTraceStep] = field(default_factory=list)
fork_join_steps: list[ForkJoinTraceStep] = field(default_factory=list)
merge_steps: list[MergeTraceStep] = field(default_factory=list)
sequence_steps: list[SequenceTraceStep] = field(default_factory=list)
def _next_global_step_index(trace: BehaviorTrace) -> int:
return (
len(trace.steps)
+ len(trace.item_flows)
+ len(trace.decision_steps)
+ len(trace.fork_join_steps)
+ len(trace.merge_steps)
+ len(trace.sequence_steps)
)
def _eval_guard_or_predicate(
ctx: RunContext,
part: PartInstance,
fn: Callable[[RunContext, PartInstance], Any],
) -> bool:
"""Evaluate a transition ``when`` guard or decision branch predicate under behavior scope."""
ctx.push_behavior_effect_scope(part)
try:
return bool(fn(ctx, part))
finally:
ctx.pop_behavior_effect_scope()
def _behavior_spec(part: PartInstance) -> list[dict[str, Any]]:
raw = getattr(part.definition_type, "_tg_behavior_spec", None)
return list(raw or [])
def _initial_state_name(definition_type: type) -> str:
cached = getattr(definition_type, "_tg_initial_state_name", None)
if cached is not None:
return cached
compiled = definition_type.compile()
for name, node in compiled["nodes"].items():
if node["kind"] == "state" and node["metadata"].get("initial"):
return name
raise ValueError(f"No initial state declared on {definition_type.__name__}")
def _ensure_active_state(ctx: RunContext, part: PartInstance) -> str:
key = part.path_string
cur = ctx.get_active_behavior_state(key)
if cur is not None:
return cur
initial = _initial_state_name(part.definition_type)
ctx.set_active_behavior_state(key, initial)
return initial
[docs]
def dispatch_event(
ctx: RunContext,
part: PartInstance,
event_name: str,
*,
trace: BehaviorTrace | None = None,
) -> DispatchResult:
"""Dispatch one discrete event on ``part``'s state machine.
Parameters
----------
ctx : RunContext
Run state (discrete state + optional item payloads).
part : PartInstance
Part whose compiled type owns transitions.
event_name : str
Declared event **name** (last segment of the event ref path).
trace : BehaviorTrace, optional
When passed, appends a :class:`BehaviorStep` on success.
Returns
-------
DispatchResult
:attr:`~DispatchOutcome.NO_MATCH`, :attr:`~DispatchOutcome.GUARD_FAILED`, or fired.
Raises
------
Exception
Any guard/effect error propagates; if the effect fails after the state advanced,
the prior discrete state is restored first.
Notes
-----
``bool(result)`` is true only when a transition fired.
"""
spec = _behavior_spec(part)
if not spec:
return DispatchResult(DispatchOutcome.NO_MATCH)
current = _ensure_active_state(ctx, part)
for tr in spec:
if tr["from_state"].path[-1] != current:
continue
if tr["on"].path[-1] != event_name:
continue
guard = tr.get("when")
if guard is not None and not _eval_guard_or_predicate(ctx, part, guard):
return DispatchResult(DispatchOutcome.GUARD_FAILED)
to_name = tr["to_state"].path[-1]
ctx.set_active_behavior_state(part.path_string, to_name)
eff_name = tr.get("effect")
try:
if eff_name:
_run_action_effect(part.definition_type, eff_name, ctx, part)
except Exception:
ctx.set_active_behavior_state(part.path_string, current)
raise
finally:
ctx.clear_item_payload(part.path_string, event_name)
if trace is not None:
trace.steps.append(
BehaviorStep(
step_index=_next_global_step_index(trace),
part_path=part.path_string,
event_name=event_name,
from_state=current,
to_state=to_name,
effect_name=eff_name,
)
)
return DispatchResult(DispatchOutcome.FIRED)
return DispatchResult(DispatchOutcome.NO_MATCH)
def _run_action_effect(definition_type: type, action_name: str, ctx: RunContext, part: PartInstance) -> None:
ctx.push_behavior_effect_scope(part)
try:
effects = getattr(definition_type, "_tg_action_effects", None)
if effects is not None:
fn = effects.get(action_name)
if fn is not None:
fn(ctx, part)
return
compiled = definition_type.compile()
node = compiled["nodes"].get(action_name)
if node is None or node["kind"] != "action":
raise KeyError(f"No action {action_name!r} on {definition_type.__name__}")
fn = node["metadata"].get("_effect")
if callable(fn):
fn(ctx, part)
finally:
ctx.pop_behavior_effect_scope()
[docs]
def behavior_authoring_projection(definition_type: type) -> dict[str, Any]:
"""Return a JSON-oriented projection of behavioral declarations on ``definition_type``.
Parameters
----------
definition_type : type
Compiled part/system type.
Returns
-------
dict
Node name lists by kind, serialized transitions, and edges (refs via :meth:`~tg_model.model.refs.Ref.to_dict`).
Notes
-----
Tooling hook only: not a strict schema for every metadata field.
"""
compiled = definition_type.compile()
nodes = compiled["nodes"]
def names(kind: str) -> list[str]:
return sorted(n for n, d in nodes.items() if d["kind"] == kind)
return {
"owner": compiled["owner"],
"states": names("state"),
"events": names("event"),
"actions": names("action"),
"guards": names("guard"),
"merges": names("merge"),
"decisions": names("decision"),
"fork_joins": names("fork_join"),
"sequences": names("sequence"),
"item_kinds": names("item_kind"),
"scenarios": names("scenario"),
"transitions": compiled.get("behavior_transitions", []),
"edges": compiled.get("edges", []),
}
[docs]
def scenario_expected_event_names(definition_type: type, scenario_name: str) -> list[str]:
"""Return authored ``expected_event_order`` names for a scenario node.
Raises
------
KeyError
If the scenario is missing.
ValueError
If metadata is malformed.
"""
compiled = definition_type.compile()
node = compiled["nodes"].get(scenario_name)
if node is None or node["kind"] != "scenario":
raise KeyError(f"No scenario {scenario_name!r} on {definition_type.__name__}")
order = node["metadata"].get("_expected_event_order")
if not isinstance(order, list):
raise ValueError(f"Malformed scenario {scenario_name!r}")
return list(order)
def _scenario_node_metadata(definition_type: type, scenario_name: str) -> dict[str, Any]:
compiled = definition_type.compile()
node = compiled["nodes"].get(scenario_name)
if node is None or node["kind"] != "scenario":
raise KeyError(f"No scenario {scenario_name!r} on {definition_type.__name__}")
return node["metadata"]
[docs]
def trace_events_chronological(trace: BehaviorTrace) -> list[tuple[str, str]]:
"""List ``(part_path, event_name)`` for state-machine steps sorted by ``step_index``.
Returns
-------
list[tuple[str, str]]
Transition events only (excludes decisions, merges, item flows).
"""
ordered = sorted(trace.steps, key=lambda s: s.step_index)
return [(s.part_path, s.event_name) for s in ordered]
[docs]
def validate_scenario_trace(
*,
definition_type: type,
scenario_name: str,
part_path: str,
trace: BehaviorTrace,
ctx: RunContext | None = None,
root: PartInstance | None = None,
) -> tuple[bool, list[str]]:
"""Compare trace slices to an authored scenario (partial contracts).
Parameters
----------
definition_type : type
Owner type of the scenario declaration.
scenario_name : str
Scenario node name on that type.
part_path : str
Instance path string for transition-focused checks.
trace : BehaviorTrace
Collected behavioral steps.
ctx : RunContext, optional
Needed when checking final discrete state.
root : PartInstance, optional
Configured root when validating ``expected_interaction_order``.
Returns
-------
ok : bool
True when every enabled check passes.
errors : list[str]
Human-readable failure messages (empty when ``ok``).
Notes
-----
This is a **bundle of independent checks**, not one end-to-end story:
- Transition events for ``part_path`` vs ``expected_event_order``.
- Optional final/initial discrete state (``ctx`` needed for final).
- Optional global transition order via :func:`trace_events_chronological` (state-machine
steps only — not decisions, merges, or item flows).
- Optional item kind order from :class:`ItemFlowStep`.
Passing everything still does not prove full causal intent; combine with tests or tooling.
Call with ``ctx`` from **outside** behavior effects when checking final state.
For ``expected_interaction_order``, pass ``root`` (configured root part) so global
ordering can be compared. For ``expected_item_kind_order``, compares item flow kinds.
"""
errors: list[str] = []
expected = scenario_expected_event_names(definition_type, scenario_name)
fired = [s.event_name for s in trace.steps if s.part_path == part_path]
if fired != expected:
errors.append(f"expected events {expected!r}, got {fired!r}")
meta = _scenario_node_metadata(definition_type, scenario_name)
final_s = meta.get("_expected_final_behavior_state")
if ctx is not None and final_s is not None:
cur = ctx.get_active_behavior_state(part_path)
if cur != final_s:
errors.append(f"expected final behavior state {final_s!r}, got {cur!r}")
initial_s = meta.get("_initial_behavior_state")
if initial_s is not None:
part_steps = [s for s in trace.steps if s.part_path == part_path]
if part_steps:
first = min(part_steps, key=lambda s: s.step_index)
if first.from_state != initial_s:
errors.append(
f"expected initial behavior state {initial_s!r}, first transition from {first.from_state!r}"
)
iord = meta.get("_expected_interaction_order")
if iord:
if root is None:
errors.append("expected_interaction_order requires validate_scenario_trace(..., root=<PartInstance>)")
else:
if root.definition_type is not definition_type:
errors.append("root part type does not match scenario definition_type")
else:
resolved: list[tuple[str, str]] = []
for pair in iord:
if len(pair) != 2:
errors.append(f"malformed interaction pair {pair!r}")
break
rel, ev = pair[0], pair[1]
full = root.path_string if not rel else f"{root.path_string}.{rel}"
resolved.append((full, ev))
if not errors:
actual = trace_events_chronological(trace)
if actual != resolved:
errors.append(f"expected interaction order {resolved!r}, got {actual!r}")
iko = meta.get("_expected_item_kind_order")
if iko is not None:
actual_k = [s.item_kind for s in sorted(trace.item_flows, key=lambda x: x.step_index)]
if actual_k != list(iko):
errors.append(f"expected item kind order {list(iko)!r}, got {actual_k!r}")
return (not errors, errors)
[docs]
def dispatch_decision(
ctx: RunContext,
part: PartInstance,
decision_name: str,
*,
trace: BehaviorTrace | None = None,
run_merge: bool = True,
) -> DecisionDispatchResult:
"""Run a declared ``decision``: first branch whose guard passes runs its action.
Parameters
----------
ctx : RunContext
Current run state.
part : PartInstance
Owner of the decision declaration.
decision_name : str
Declared decision node name.
trace : BehaviorTrace, optional
Records :class:`DecisionTraceStep` when provided.
run_merge : bool, default True
When False, skip automatic paired merge (advanced sequencing).
Raises
------
KeyError
If ``decision_name`` is not declared on ``part.definition_type``.
Returns
-------
DecisionDispatchResult
``outcome`` is :attr:`~DecisionDispatchOutcome.NO_ACTION` only when no branch matched
and there is no ``default_action``. ``bool(result)`` is true iff an action ran.
Notes
-----
If the decision was declared with ``merge_point=`` to a :meth:`merge` node, also runs
that merge's ``then_action`` after the branch action (unless ``run_merge=False`` for
manual :func:`dispatch_merge` — do **not** call :func:`dispatch_merge` again for the
same merge when pairing is enabled, or the continuation runs twice).
Branches with ``guard is None`` match unconditionally (place them last unless you
intend a catch-all).
"""
specs = getattr(part.definition_type, "_tg_decision_specs", None) or {}
spec = specs.get(decision_name)
if spec is None:
raise KeyError(f"No decision {decision_name!r} on {part.definition_type.__name__}")
chosen: str | None = None
for pred, aname in spec["branches"]:
if pred is None:
chosen = aname
break
if _eval_guard_or_predicate(ctx, part, pred):
chosen = aname
break
if chosen is None:
chosen = spec.get("default_action")
if chosen is not None:
_run_action_effect(part.definition_type, chosen, ctx, part)
if trace is not None:
idx = _next_global_step_index(trace)
trace.decision_steps.append(
DecisionTraceStep(
step_index=idx,
part_path=part.path_string,
decision_name=decision_name,
chosen_action=chosen,
)
)
merge_name = spec.get("merge_name")
merge_ran = False
if chosen is not None and merge_name and run_merge:
dispatch_merge(ctx, part, merge_name, trace=trace)
merge_ran = True
outcome = DecisionDispatchOutcome.ACTION_RAN if chosen is not None else DecisionDispatchOutcome.NO_ACTION
return DecisionDispatchResult(
outcome=outcome,
chosen_action=chosen,
merge_ran=merge_ran,
)
[docs]
def dispatch_merge(
ctx: RunContext,
part: PartInstance,
merge_name: str,
*,
trace: BehaviorTrace | None = None,
) -> str | None:
"""Continue at a declared ``merge``: runs optional ``then_action`` (shared after branches).
Call after exclusive branches (e.g. following :func:`dispatch_decision`) to model a
methodology **Merge** node. If no ``then_action`` was declared, returns ``None`` and
only records the trace step when ``trace`` is set.
Raises
------
KeyError
If ``merge_name`` is not declared.
"""
specs = getattr(part.definition_type, "_tg_merge_specs", None) or {}
spec = specs.get(merge_name)
if spec is None:
raise KeyError(f"No merge {merge_name!r} on {part.definition_type.__name__}")
then_a = spec.get("then_action")
if then_a:
_run_action_effect(part.definition_type, then_a, ctx, part)
if trace is not None:
idx = _next_global_step_index(trace)
trace.merge_steps.append(
MergeTraceStep(
step_index=idx,
part_path=part.path_string,
merge_name=merge_name,
then_action=then_a,
)
)
return then_a
[docs]
def dispatch_fork_join(
ctx: RunContext,
part: PartInstance,
block_name: str,
*,
trace: BehaviorTrace | None = None,
) -> None:
"""Execute a ``fork_join`` block: branches run **one after another** (fixed list order).
Raises
------
KeyError
If ``block_name`` is not declared.
v0 semantics are **deterministic serial** execution, not OS-level parallelism or
arbitrary interleaving; ``fork``/``join`` name the *logical* activity structure.
"""
specs = getattr(part.definition_type, "_tg_fork_join_specs", None) or {}
spec = specs.get(block_name)
if spec is None:
raise KeyError(f"No fork_join {block_name!r} on {part.definition_type.__name__}")
for branch in spec["branches"]:
for aname in branch:
_run_action_effect(part.definition_type, aname, ctx, part)
then_a = spec.get("then_action")
if then_a:
_run_action_effect(part.definition_type, then_a, ctx, part)
if trace is not None:
idx = _next_global_step_index(trace)
trace.fork_join_steps.append(
ForkJoinTraceStep(
step_index=idx,
part_path=part.path_string,
block_name=block_name,
)
)
[docs]
def dispatch_sequence(
ctx: RunContext,
part: PartInstance,
sequence_name: str,
*,
trace: BehaviorTrace | None = None,
) -> None:
"""Run a declared linear ``sequence`` of actions (methodology default simplicity rule).
Raises
------
KeyError
If ``sequence_name`` is not declared.
"""
specs = getattr(part.definition_type, "_tg_sequence_specs", None) or {}
step_names = specs.get(sequence_name)
if step_names is None:
raise KeyError(f"No sequence {sequence_name!r} on {part.definition_type.__name__}")
for aname in step_names:
_run_action_effect(part.definition_type, aname, ctx, part)
if trace is not None:
idx = _next_global_step_index(trace)
trace.sequence_steps.append(
SequenceTraceStep(
step_index=idx,
part_path=part.path_string,
sequence_name=sequence_name,
)
)
[docs]
def emit_item(
ctx: RunContext,
cm: Any,
source_port: PortInstance,
item_kind: str,
payload: Any,
*,
trace: BehaviorTrace | None = None,
) -> list[DispatchResult]:
"""Send an item from ``source_port`` across structural connections.
Parameters
----------
ctx : RunContext
Stages payloads per receiving part/event.
cm : ConfiguredModel
Supplies ``connections`` and :meth:`~tg_model.execution.configured_model.ConfiguredModel.handle`.
source_port : PortInstance
Emitting port.
item_kind : str
Event name / kind matched on receivers; may be filtered by binding ``carrying``.
payload : Any
Opaque payload for receiver effects.
trace : BehaviorTrace, optional
Records :class:`ItemFlowStep` rows.
Returns
-------
list[DispatchResult]
One result per matched connection (may be empty).
Notes
-----
For each matching :class:`~tg_model.execution.connection_bindings.ConnectionBinding`
(same source port; optional ``carrying`` must match ``item_kind``), dispatches
``item_kind`` on the receiving part. Payload is staged via
:meth:`RunContext.prime_item_payload` and cleared if dispatch does not fire.
Bindings are visited in ``cm.connections`` order (deterministic for a frozen model).
"""
results: list[DispatchResult] = []
for cb in cm.connections:
if cb.source.stable_id != source_port.stable_id:
continue
if cb.carrying is not None and cb.carrying != item_kind:
continue
tgt = cb.target
parent_path = ".".join(tgt.instance_path[:-1])
receiver = cm.handle(parent_path)
if not isinstance(receiver, PartInstance):
continue
ctx.prime_item_payload(receiver.path_string, item_kind, payload)
if trace is not None:
trace.item_flows.append(
ItemFlowStep(
step_index=_next_global_step_index(trace),
source_port_path=source_port.path_string,
target_port_path=tgt.path_string,
item_kind=item_kind,
payload=payload,
)
)
res = dispatch_event(ctx, receiver, item_kind, trace=trace)
if res.outcome != DispatchOutcome.FIRED:
ctx.clear_item_payload(receiver.path_string, item_kind)
results.append(res)
return results
[docs]
def behavior_trace_to_records(trace: BehaviorTrace) -> list[dict[str, Any]]:
"""Flatten ``trace`` into JSON-friendly dict rows sorted by ``step_index``.
Parameters
----------
trace : BehaviorTrace
Collected steps from one or more dispatch calls.
Returns
-------
list[dict]
Each dict has ``kind``, ``step_index``, and kind-specific keys.
"""
out: list[dict[str, Any]] = []
for s in trace.steps:
out.append(
{
"kind": "transition",
"step_index": s.step_index,
"part_path": s.part_path,
"event_name": s.event_name,
"from_state": s.from_state,
"to_state": s.to_state,
"effect_name": s.effect_name,
}
)
for s in trace.item_flows:
rec: dict[str, Any] = {
"kind": "item_flow",
"step_index": s.step_index,
"source_port_path": s.source_port_path,
"target_port_path": s.target_port_path,
"item_kind": s.item_kind,
}
if s.payload is not None:
rec["payload"] = s.payload
out.append(rec)
for s in trace.decision_steps:
out.append(
{
"kind": "decision",
"step_index": s.step_index,
"part_path": s.part_path,
"decision_name": s.decision_name,
"chosen_action": s.chosen_action,
}
)
for s in trace.fork_join_steps:
out.append(
{
"kind": "fork_join",
"step_index": s.step_index,
"part_path": s.part_path,
"block_name": s.block_name,
}
)
for s in trace.merge_steps:
out.append(
{
"kind": "merge",
"step_index": s.step_index,
"part_path": s.part_path,
"merge_name": s.merge_name,
"then_action": s.then_action,
}
)
for s in trace.sequence_steps:
out.append(
{
"kind": "sequence",
"step_index": s.step_index,
"part_path": s.part_path,
"sequence_name": s.sequence_name,
}
)
out.sort(key=lambda r: r["step_index"])
return out