A lightweight wrapper around transitions for building async-safe, recoverable hierarchical state machines with minimal boilerplate.
Table of Contents
- ✨ Features
 - 🧠 Concepts & Overview
 - ⚙️ Installation & Setup
 - 🚀 Quickstart Tutorial
 - 🛠 How-to Guides
 - 📖 API Reference
 - 🔍 Troubleshooting & FAQ
 
✨ Features
- Built-in 
ready/faultstates - Global transitions: 
to_fault,reset - Optional state recovery (
recover__state) - Async task spawning and cancellation
 - Timeouts and auto-fault handling
 - Transition history recording with timestamps + durations
 - Guard conditions for blocking transitions
 - Global state change callbacks for logging/MQTT
 - Optional FastAPI router for HTTP access and visualization
 
🧠 Concepts & Overview
This library uses a declarative domain-specific language (DSL) to define state machines in a readable, strongly typed way.
- State → A leaf node in the state machine
 - StateGroup → Groups related states, creating hierarchical namespaces
 - Trigger → Named events that initiate transitions
 
Example:
class MyStates(StateGroup):
    idle: State = State()
    working: State = State()
class Triggers:
    begin = Trigger("begin")
    finish = Trigger("finish")
TRANSITIONS = [
    Triggers.finish.transition(MyStates.working, MyStates.idle),
]
Base States and Triggers
All machines include:
States:
ready(initial)fault(global error)
Triggers:
start,to_fault,reset
from state_machine.core import BaseStates, BaseTriggers
state_machine.trigger(BaseTriggers.RESET.value)
assert state_machine.state == BaseStates.READY.value
⚙️ Installation & Setup
pip install vention-state-machine
Optional dependencies:
- Graphviz (required for diagram generation)
 - FastAPI (for HTTP exposure of state machine)
 
🚀 Quickstart Tutorial
1. Define States and Triggers
from state_machine.defs import StateGroup, State, Trigger
class Running(StateGroup):
    picking: State = State()
    placing: State = State()
    homing: State = State()
class States:
    running = Running()
class Triggers:
    start = Trigger("start")
    finished_picking = Trigger("finished_picking")
    finished_placing = Trigger("finished_placing")
    finished_homing = Trigger("finished_homing")
    to_fault = Trigger("to_fault")
    reset = Trigger("reset")
2. Define Transitions
TRANSITIONS = [
    Triggers.start.transition("ready", States.running.picking),
    Triggers.finished_picking.transition(States.running.picking, States.running.placing),
    Triggers.finished_placing.transition(States.running.placing, States.running.homing),
    Triggers.finished_homing.transition(States.running.homing, States.running.picking),
]
3. Implement Your State Machine
from state_machine.core import StateMachine
from state_machine.decorators import on_enter_state, auto_timeout, guard, on_state_change
class CustomMachine(StateMachine):
    def __init__(self):
        super().__init__(states=States, transitions=TRANSITIONS)
    @on_enter_state(States.running.picking)
    @auto_timeout(5.0, Triggers.to_fault)
    def enter_picking(self, _):
        print("🔹 Entering picking")
    @on_enter_state(States.running.placing)
    def enter_placing(self, _):
        print("🔸 Entering placing")
    @on_enter_state(States.running.homing)
    def enter_homing(self, _):
        print("🔺 Entering homing")
    @guard(Triggers.reset)
    def check_safety_conditions(self) -> bool:
        return not self.estop_pressed
    @on_state_change
    def publish_state_to_mqtt(self, old_state: str, new_state: str, trigger: str):
        mqtt_client.publish("machine/state", {
            "old_state": old_state,
            "new_state": new_state,
            "trigger": trigger
        })
4. Start It
state_machine = StateMachine()
state_machine.start()
🛠 How-to Guides
Expose Over HTTP with FastAPI
from fastapi import FastAPI
from state_machine.router import build_router
from state_machine.core import StateMachine
state_machine = StateMachine(...)
state_machine.start()
app = FastAPI()
app.include_router(build_router(state_machine))
Endpoints:
GET /state→ Current stateGET /history→ Transition historyPOST /<trigger>→ Trigger a transitionGET /diagram.svg→ Graphviz diagram
Timeout Example
@auto_timeout(5.0, Triggers.to_fault)
def enter_state(self, _):
    ...
Recovery Example
state_machine = StateMachine(enable_last_state_recovery=True)
state_machine.start()  # will attempt recover__{last_state}
How-to: Run and Cancel Async Tasks
The state machine can manage background coroutines that should be cancelled automatically when a transition occurs (e.g. leaving a state or hitting fault).
import asyncio
from state_machine.core import StateMachine
from state_machine.defs import State, StateGroup, Trigger
from state_machine.decorators import on_enter_state
class MyStates(StateGroup):
    monitoring: State = State()
class States:
    monitoring = MyStates()
class Triggers:
    start = Trigger("start")
    to_fault = Trigger("to_fault")
TRANSITIONS = [
    Triggers.start.transition("ready", States.monitoring.monitoring),
]
class MonitoringMachine(StateMachine):
    def __init__(self):
        super().__init__(states=States, transitions=TRANSITIONS)
    @on_enter_state(States.monitoring.monitoring)
    def enter_monitoring(self, _):
        # Spawn an async background task (e.g. polling a sensor)
        async def poll_sensor():
            while True:
                print("🔎 Checking sensor...")
                await asyncio.sleep(1)
        self.spawn(poll_sensor())
The spawn() method wraps asyncio.create_task and tracks the task.
On a transition to fault (via to_fault) or any time cancel_tasks() is called, all tracked tasks are cancelled cleanly.
Manual cancellation example:
machine = MonitoringMachine()
machine.start()
# later in your app
await machine.cancel_tasks()
📖 API Reference
StateMachine
class StateMachine(HierarchicalGraphMachine):
    def __init__(
        self,
        states: Union[object, list[dict[str, Any]], None],
        *,
        transitions: Optional[list[dict[str, str]]] = None,
        history_size: Optional[int] = None,
        enable_last_state_recovery: bool = True,
        **kw: Any,
    )
Parameters:
states: Either a container of StateGroups or a list of state dicts.transitions: List of transition dictionaries, or[].history_size: Max number of entries in transition history (default 1000).enable_last_state_recovery: If True, machine can resume from last recorded state.
Methods
spawn(coro: Coroutine) -> asyncio.Task
Start a background coroutine and track it. Auto-cancelled on fault/reset.
cancel_tasks() -> None
Cancel all tracked tasks and timeouts.
set_timeout(state_name: str, seconds: float, trigger_fn: Callable[[], str]) -> None
Schedule a trigger if state_name stays active too long.
record_last_state() -> None
Save current state for recovery.
get_last_state() -> Optional[str]
Return most recently recorded state.
start() -> None
Enter machine (recover__... if applicable, else start).
Properties
history -> list[dict[str, Any]]
Full transition history with timestamps/durations.
get_last_history_entries(n: int) -> list[dict[str, Any]]
Return last n transitions.
Decorators
@on_enter_state(state: State)
Bind function to run on entry.
@on_exit_state(state: State)
Bind function to run on exit.
@auto_timeout(seconds: float, trigger: Trigger)
Auto-trigger if timeout expires.
@guard(*triggers: Trigger)
Guard transition; blocks if function returns False.
@on_state_change
Global callback (old_state, new_state, trigger) fired after each transition.
Router
def build_router(
    machine: StateMachine,
    triggers: Optional[list[Trigger]] = None
) -> fastapi.APIRouter
Exposes endpoints:
GET /stateGET /historyPOST /<trigger>GET /diagram.svg
🔍 Troubleshooting & FAQ
- Diagram endpoint returns 503 → Graphviz not installed.
 - Transitions blocked unexpectedly → Check guard conditions.
 - Callbacks not firing → Only successful transitions trigger them.
 - State not restored after restart → Ensure 
enable_last_state_recovery=True.