---
title: "Communication"
slug: "communication"
updated: 2025-12-10T19:25:41Z
published: 2025-12-10T19:25:41Z
---

> ## Documentation Index
> Fetch the complete documentation index at: https://docs.vention.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Communication

A thin, FastAPI-powered RPC layer for machine-apps that exposes Connect-compatible request-response and server-streaming endpoints — plus .proto generation from Python decorators, allowing typed SDKs to be generated separately via Buf.

## **Table of Contents**

- [✨ Features](https://pypi.org/project/vention-communication/#-features)
- [🧠 Concepts & Overview](https://pypi.org/project/vention-communication/#-concepts--overview)
- [⚙️ Installation & Setup](https://pypi.org/project/vention-communication/#%EF%B8%8F-installation--setup)
- [🚀 Quickstart Tutorial](https://pypi.org/project/vention-communication/#-quickstart-tutorial)
- [🛠 How-to Guides](https://pypi.org/project/vention-communication/#-how-to-guides)
- [📖 API Reference](https://pypi.org/project/vention-communication/#-api-reference)
- [🔍 Troubleshooting & FAQ](https://pypi.org/project/vention-communication/#-troubleshooting--faq)

## **✨ Features**

- **Zero boilerplate RPCs**: Expose any async Python function as a network API with a single decorator.
- **Strong typing**: Request and response models derived directly from Python annotations.
- **Built-in schema emission**: Generates a .proto file at runtime, ready for SDK code generation.
- **Single service surface**: All methods exposed under `/rpc/&lt;package.Service&gt;/&lt;Method&gt;`.
- **Connect-compatible transport**: Works seamlessly with `@connectrpc/connect-web` and `connectrpc-python`.

## **🧠 Concepts & Overview**

vention-communication eases cross-process data exchange by turning annotated Python functions into typed RPC endpoints automatically.

`@action()` → defines a one-request / one-response method.

`@stream()` → defines a live telemetry or event stream that other services can subscribe to.

`VentionApp.finalize()` → scans for decorators, builds a Connect router, and emits a .proto schema.

Once the .proto exists, SDKs for TypeScript, Python, or Go can be generated using Buf. The result: your frontend gets auto-completed methods for every RPC, no HTTP or JSON code required.

### **Core Concepts**

- **Actions (Request-Response)** — send a request, get a response back. Input and output types are inferred from function annotations. If either is missing, `google.protobuf.Empty` is used.
- **Streams (Server streaming)** — continuous updates broadcast to all subscribers. Each stream can optionally replay the last value when someone subscribes. Queues default to size-1 to always show the latest value.
- **Service Surface** — all actions and streams belong to one service, e.g. `vention.app.v1.&lt;YourAppName&gt;Service`, with routes mounted under `/rpc`.
- **Proto Generation** — `VentionApp.finalize()` writes a .proto to disk, capturing all decorated RPCs, inferred models, and scalar wrappers. SDK generation (via Buf) is handled externally.

## **⚙️ Installation & Setup**

**Requirements:**

- Python 3.10+
- FastAPI
- Uvicorn (for serving)

**Install:**

```plaintext
pip install vention-communication
```

**Optional client libraries:**

- TypeScript: `@connectrpc/connect-web`
- Python: `connectrpc` with `httpx.AsyncClient`

## **🚀 Quickstart Tutorial**

A complete "hello world" in three steps.

### **1. Define your RPCs**

```plaintext
from pydantic import BaseModel
from vention_communication import VentionApp, action, stream
import time, random

class PingRequest(BaseModel):
    message: str

class PingResponse(BaseModel):
    message: str

class Heartbeat(BaseModel):
    value: str
    timestamp: int

app = VentionApp(name="DemoApp", emit_proto=True)

@action()
async def ping(req: PingRequest) -> PingResponse:
    return PingResponse(message=f"Pong: {req.message}")

@stream(name="heartbeat", payload=Heartbeat, replay=True)
async def heartbeat():
    """Broadcast a live heartbeat value to all subscribers."""
    return Heartbeat(value=f"{random.uniform(0,100):.2f}", timestamp=int(time.time()))

app.finalize()
```

**Run:**

```plaintext
uvicorn demo.main:app --reload
```

Endpoints are automatically registered under `/rpc/vention.app.v1.DemoAppService.`

### **2. Generated .proto**

After startup, `proto/app.proto` is emitted automatically.

You can now use Buf or protoc to generate client SDKs:

```plaintext
buf generate --template buf.gen.ts.yaml
buf generate --template buf.gen.python.yaml
```

SDK generation is external to vention-communication — allowing you to control versions and plugins.

### **3. Example TypeScript Client**

```plaintext
import { createClient } from "@connectrpc/connect";
import { createConnectTransport } from "@connectrpc/connect-web";
import { DemoAppService } from "./gen/connect/proto/app_connect";

const transport = createConnectTransport({
  baseUrl: "http://localhost:8000/rpc",
  useBinaryFormat: false,
});

const client = createClient(DemoAppService, transport);

const res = await client.ping({ message: "Hello" });
console.log(res.message);

for await (const hb of client.heartbeat({})) {
  console.log("Heartbeat", hb.value, hb.timestamp);
}
```

## **🛠 How-to Guides**

### **Add a new request-response endpoint**

```plaintext
@action()
async def get_status() -> dict:
    return {"ok": True}
```

### **Add a new stream**

```plaintext
@stream(name="Status", payload=dict)
async def publish_status() -> dict:
    return {"ok": True}
```

### **Emit proto to a custom path**

```plaintext
app = VentionApp(name="MyService", emit_proto=True, proto_path="out/myservice.proto")
app.finalize()
```

## **📖 API Reference**

### **VentionApp**

```plaintext
VentionApp(
  name: str = "VentionApp",
  *,
  emit_proto: bool = False,
  proto_path: str = "proto/app.proto",
  **fastapi_kwargs
)
```

**Methods:**

- `.register_rpc_plugin(bundle: RpcBundle)` — merges external action/stream definitions (e.g., from state-machine or storage).
- `.finalize()` — registers routes, emits .proto, and makes publishers available.

**Attributes:**

- `connect_router`: internal FastAPI router for Connect RPCs.
- `proto_path`: location of the emitted .proto.

### **Decorators**

```plaintext
@action(name: Optional[str] = None)
# → Registers a request-response handler

@stream(
    name: str,
    payload: type,
    replay: bool = True,
    queue_maxsize: int = 1,
    policy: Literal["latest", "fifo"] = "latest"
)
# → Registers a server-streaming RPC and publisher
```

**Stream Parameters:**

- `name`: Unique name for the stream
- `payload`: Type of data to stream (Pydantic model or JSON-serializable type)
- `replay`: Whether new subscribers receive the last value (default: `True`)
- `queue_maxsize`: Maximum items per subscriber queue (default: `1`)
- `policy`: Delivery policy when queue is full - `"latest"` drops old items, `"fifo"` waits for space (default: `"latest"`)

### **Stream Configuration Options**

When creating a stream with `@stream()`, you can configure how updates are delivered to subscribers:

#### `replay`******(default:******`True`**)**

Controls whether new subscribers receive the last published value immediately when they subscribe.

- `replay=True`: New subscribers instantly receive the most recent value (if one exists). Useful for state streams where clients need the current state immediately upon connection.
- `replay=False`: New subscribers only receive values published after they subscribe. Useful for event streams where you only want to see new events.

#### `queue_maxsize`******(default:******`1`**)**

The maximum number of items that can be queued for each subscriber before the delivery policy kicks in.

- `queue_maxsize=1`: Only the latest value is kept. Perfect for state streams where you only care about the current state.
- `queue_maxsize=N` (N > 1): Allows buffering up to N items. Useful when subscribers might process items slower than they're published, but you still want to limit memory usage.

```plaintext
# Only keep latest temperature reading
@stream(name="Temperature", payload=Temperature, queue_maxsize=1)

# Buffer up to 10 sensor readings
@stream(name="SensorData", payload=SensorReading, queue_maxsize=10)
```

#### `policy`******(default:******`"latest"`**)**

Defines what happens when a subscriber’s queue is full and a new value is published.

Each subscriber maintains its own in-memory queue of pending messages. When you publish faster than a client can consume, the queue eventually fills — the policy determines what happens next.

`policy="latest"` — “drop oldest, never block”

- The publisher never waits.
- If a subscriber’s queue is full, the oldest item is dropped and the new one is inserted immediately.
- Fast subscribers receive every message; slow subscribers skip intermediate values but always see the most recent state.

✅ Pros

- Zero backpressure — publisher performance unaffected by slow clients.
- Keeps UI dashboards and telemetry feeds current (“latest value always wins”).
- Ideal for high-frequency data (positions, sensor readings, machine state).

⚠️ Cons

- Drops messages for slow clients (they may miss intermediate updates).
- Subscribers can diverge — one may receive more updates than another.

Example:

```plaintext
@stream(name="Temperature", payload=TempReading,
        policy="latest", queue_maxsize=1)
# → publisher never blocks; subscribers always see the most recent temperature
```

`policy="fifo"` — “deliver all, may block”

- The publisher awaits until there’s space in every subscriber’s queue.
- Guarantees that all messages are delivered in order to every subscriber.
- A slow subscriber can stall the entire stream, because the distributor waits for that subscriber’s queue to make room before continuing.

✅ Pros

- Preserves every event and strict ordering.
- Reliable for command sequences, audit logs, and event-driven logic.

⚠️ Cons

- One slow or paused subscriber can block all others.
- Publishing rate is limited by the slowest client.
- In extreme cases, a throttled browser or dropped connection can cause the distributor to stall until the queue frees or the subscriber is removed.

Example:

```plaintext
@stream(name="Events", payload=MachineEvent,
        policy="fifo", queue_maxsize=100)
# → guarantees ordered delivery but can back-pressure the publisher
```

**Common Combinations:**

- **State monitoring** (default): `replay=True`, `queue_maxsize=1`, `policy="latest"` — subscribers get current state immediately and always see the latest value.
- **Event streaming**: `replay=False`, `queue_maxsize=100`, `policy="fifo"` — subscribers only see new events and process them in order.

## **🔍 Troubleshooting & FAQ**

**Q: Can I disable proto generation at runtime?**

Yes — set `emit_proto=False` in `VentionApp(...)`.

**Q: Publishing raises******`KeyError: Unknown stream`**.**

Ensure `app.finalize()` has been called before publishing or subscribing.

**Q: How do I integrate this with other libraries (state machine, storage, etc.)?**

Use `app.register_rpc_plugin()` to merge additional RPC definitions before calling `.finalize()`.
