A lightweight wrapper around transitions for building async-safe, recoverable hierarchical state machines with minimal boilerplate.
Table of Contents
- ✨ Features
- 🧠 Concepts & Overview
- ⚙️ Installation & Setup
- 🚀 Quickstart Tutorial
- 🛠 How-to Guides
- 📖 API Reference
- 🔍 Troubleshooting & FAQ
✨ Features
- Built-in
ready/faultstates - Global transitions:
to_fault,reset - Optional state recovery (
recover__state) - Async task spawning and cancellation
- Timeouts and auto-fault handling
- Transition history recording with timestamps + durations
- Guard conditions for blocking transitions
- Global state change callbacks for logging/MQTT
- Optional FastAPI router for HTTP access and visualization
🧠 Concepts & Overview
This library uses a declarative domain-specific language (DSL) to define state machines in a readable, strongly typed way.
- State → A leaf node in the state machine
- StateGroup → Groups related states, creating hierarchical namespaces
- Trigger → Named events that initiate transitions
Example:
class MyStates(StateGroup):
idle: State = State()
working: State = State()
class Triggers:
begin = Trigger("begin")
finish = Trigger("finish")
TRANSITIONS = [
Triggers.finish.transition(MyStates.working, MyStates.idle),
]
Base States and Triggers
All machines include:
States:
ready(initial)fault(global error)
Triggers:
start,to_fault,reset
from state_machine.core import BaseStates, BaseTriggers
state_machine.trigger(BaseTriggers.RESET.value)
assert state_machine.state == BaseStates.READY.value
⚙️ Installation & Setup
pip install vention-state-machine
Optional dependencies:
- Graphviz (required for diagram generation)
- FastAPI (for HTTP exposure of state machine)
🚀 Quickstart Tutorial
1. Define States and Triggers
from state_machine.defs import StateGroup, State, Trigger
class Running(StateGroup):
picking: State = State()
placing: State = State()
homing: State = State()
class States:
running = Running()
class Triggers:
start = Trigger("start")
finished_picking = Trigger("finished_picking")
finished_placing = Trigger("finished_placing")
finished_homing = Trigger("finished_homing")
to_fault = Trigger("to_fault")
reset = Trigger("reset")
2. Define Transitions
TRANSITIONS = [
Triggers.start.transition("ready", States.running.picking),
Triggers.finished_picking.transition(States.running.picking, States.running.placing),
Triggers.finished_placing.transition(States.running.placing, States.running.homing),
Triggers.finished_homing.transition(States.running.homing, States.running.picking),
]
3. Implement Your State Machine
from state_machine.core import StateMachine
from state_machine.decorators import on_enter_state, auto_timeout, guard, on_state_change
class CustomMachine(StateMachine):
def __init__(self):
super().__init__(states=States, transitions=TRANSITIONS)
@on_enter_state(States.running.picking)
@auto_timeout(5.0, Triggers.to_fault)
def enter_picking(self, _):
print("🔹 Entering picking")
@on_enter_state(States.running.placing)
def enter_placing(self, _):
print("🔸 Entering placing")
@on_enter_state(States.running.homing)
def enter_homing(self, _):
print("🔺 Entering homing")
@guard(Triggers.reset)
def check_safety_conditions(self) -> bool:
return not self.estop_pressed
@on_state_change
def publish_state_to_mqtt(self, old_state: str, new_state: str, trigger: str):
mqtt_client.publish("machine/state", {
"old_state": old_state,
"new_state": new_state,
"trigger": trigger
})
4. Start It
state_machine = StateMachine()
state_machine.start()
🛠 How-to Guides
Expose Over HTTP with FastAPI
from fastapi import FastAPI
from state_machine.router import build_router
from state_machine.core import StateMachine
state_machine = StateMachine(...)
state_machine.start()
app = FastAPI()
app.include_router(build_router(state_machine))
Endpoints:
GET /state→ Current stateGET /history→ Transition historyPOST /<trigger>→ Trigger a transitionGET /diagram.svg→ Graphviz diagram
Timeout Example
@auto_timeout(5.0, Triggers.to_fault)
def enter_state(self, _):
...
Recovery Example
state_machine = StateMachine(enable_last_state_recovery=True)
state_machine.start() # will attempt recover__{last_state}
How-to: Run and Cancel Async Tasks
The state machine can manage background coroutines that should be cancelled automatically when a transition occurs (e.g. leaving a state or hitting fault).
import asyncio
from state_machine.core import StateMachine
from state_machine.defs import State, StateGroup, Trigger
from state_machine.decorators import on_enter_state
class MyStates(StateGroup):
monitoring: State = State()
class States:
monitoring = MyStates()
class Triggers:
start = Trigger("start")
to_fault = Trigger("to_fault")
TRANSITIONS = [
Triggers.start.transition("ready", States.monitoring.monitoring),
]
class MonitoringMachine(StateMachine):
def __init__(self):
super().__init__(states=States, transitions=TRANSITIONS)
@on_enter_state(States.monitoring.monitoring)
def enter_monitoring(self, _):
# Spawn an async background task (e.g. polling a sensor)
async def poll_sensor():
while True:
print("🔎 Checking sensor...")
await asyncio.sleep(1)
self.spawn(poll_sensor())
The spawn() method wraps asyncio.create_task and tracks the task.
On a transition to fault (via to_fault) or any time cancel_tasks() is called, all tracked tasks are cancelled cleanly.
Manual cancellation example:
machine = MonitoringMachine()
machine.start()
# later in your app
await machine.cancel_tasks()
📖 API Reference
StateMachine
class StateMachine(HierarchicalGraphMachine):
def __init__(
self,
states: Union[object, list[dict[str, Any]], None],
*,
transitions: Optional[list[dict[str, str]]] = None,
history_size: Optional[int] = None,
enable_last_state_recovery: bool = True,
**kw: Any,
)
Parameters:
states: Either a container of StateGroups or a list of state dicts.transitions: List of transition dictionaries, or[].history_size: Max number of entries in transition history (default 1000).enable_last_state_recovery: If True, machine can resume from last recorded state.
Methods
spawn(coro: Coroutine) -> asyncio.Task
Start a background coroutine and track it. Auto-cancelled on fault/reset.
cancel_tasks() -> None
Cancel all tracked tasks and timeouts.
set_timeout(state_name: str, seconds: float, trigger_fn: Callable[[], str]) -> None
Schedule a trigger if state_name stays active too long.
record_last_state() -> None
Save current state for recovery.
get_last_state() -> Optional[str]
Return most recently recorded state.
start() -> None
Enter machine (recover__... if applicable, else start).
Properties
history -> list[dict[str, Any]]
Full transition history with timestamps/durations.
get_last_history_entries(n: int) -> list[dict[str, Any]]
Return last n transitions.
Decorators
@on_enter_state(state: State)
Bind function to run on entry.
@on_exit_state(state: State)
Bind function to run on exit.
@auto_timeout(seconds: float, trigger: Trigger)
Auto-trigger if timeout expires.
@guard(*triggers: Trigger)
Guard transition; blocks if function returns False.
@on_state_change
Global callback (old_state, new_state, trigger) fired after each transition.
Router
def build_router(
machine: StateMachine,
triggers: Optional[list[Trigger]] = None
) -> fastapi.APIRouter
Exposes endpoints:
GET /stateGET /historyPOST /<trigger>GET /diagram.svg
🔍 Troubleshooting & FAQ
- Diagram endpoint returns 503 → Graphviz not installed.
- Transitions blocked unexpectedly → Check guard conditions.
- Callbacks not firing → Only successful transitions trigger them.
- State not restored after restart → Ensure
enable_last_state_recovery=True.