Flows & nodes¶
What it is / when to use it¶
PenguiFlow’s core runtime executes a directed graph of async nodes with:
- bounded queues (backpressure),
- per-node reliability policy (timeouts + retries),
- optional typed validation (
ModelRegistry), - structured runtime events (
FlowEvent) for observability.
Use the core runtime when you want deterministic pipelines or agent graphs where the graph topology is the orchestrator.
Non-goals / boundaries¶
- The runtime is not a planner. It will not choose tools or invent steps (see
ReactPlannerfor that). - It does not persist state by default. Durability is via an optional
StateStore. - It does not enforce multi-tenant authz; tenant scoping is a contract you implement (typically via message headers and separate ingress).
Contract surface¶
Nodes¶
Node wraps an async function with metadata:
- signature:
async def fn(message, ctx) -> Any NodePolicycontrols validation, timeouts, and retriesallow_cycle=Truepermits self-cycles for loop-style graphs
NodePolicy (runtime):
validate:"both" | "in" | "out" | "none"timeout_s: per-invocation timeoutmax_retries: retry count (total attempts =max_retries + 1)backoff_base,backoff_mult,max_backoff: exponential backoff parameters
Graph construction¶
Graphs are defined by adjacency tuples:
from penguiflow import Node, create
flow = create(
a.to(b),
b.to(),
)
create(...) returns a PenguiFlow runtime with key knobs:
queue_maxsize: bounded queue size per edge (default 64,<= 0disables backpressure)allow_cycles: allow cycles in the graph (default False)middlewares: async hooks receivingFlowEventemit_errors_to_rookery: route terminalFlowErrorobjects to the sink so callers can fetch themstate_store: persist runtime events and remote bindingsmessage_bus: optional bus integration for edge traffic
Endpoints: OpenSea and Rookery¶
The runtime synthesizes two endpoints:
- OpenSea: ingress (feeds nodes with no incoming edges)
- Rookery: egress sink (receives nodes with no outgoing edges)
This is why await flow.emit(...) and await flow.fetch() work “out of the box”.
Messages (payload-only vs envelopes)¶
- Payload-only nodes pass Pydantic models directly.
- Envelope-based flows pass
Message(payload=..., headers=..., trace_id=...)to enable cancellation, deadlines, and streaming.
See Messages & envelopes.
Operational defaults (recommended)¶
- Keep
queue_maxsizebounded (64 is a safe starting point). - Use
ModelRegistry+NodePolicy(validate="both")for typed boundaries. - Use
Messageenvelopes for production features (streaming/cancel/deadlines). - Always
await flow.stop()to avoid orphaned node workers.
Failure modes & recovery¶
Cycle errors¶
If your graph contains a cycle and allow_cycles=False, create(...) raises CycleError.
Fix:
- remove the cycle, or
- set
allow_cycles=True(and ensure nodes are cycle-safe).
Nothing ever reaches fetch()¶
Typical causes:
- no egress node exists (all nodes have successors), so nothing routes to Rookery
- your egress node returns
None(no message emitted) - you never
flow.run(...)before emitting
Fix:
- ensure an egress node exists and returns a value (or emits to a sink),
- call
flow.run(...)beforeemit(...).
Validation failures¶
If NodePolicy.validate != "none" and your registry is missing a model entry, flow.run(registry=...) fails fast.
Fix:
- register models for every validated node name, or set
validate="none"for that node.
Observability¶
The runtime emits structured FlowEvent objects and logs them (logger: penguiflow.core).
You can attach middleware to capture events:
- pass
middlewares=[...]tocreate(...), or - call
flow.add_middleware(...).
If state_store is configured, runtime events are persisted via StateStore.save_event(...).
Security / multi-tenancy notes¶
- Don’t mix tenants inside a trace. Use
Headers.tenantand enforce scoping at ingress/egress boundaries. - Avoid storing secrets in message
metaif you persist events or logs. - Treat
trace_idandfetch(trace_id=...)as an authorization surface in apps (don’t allow a user to fetch/cancel another user’s trace).
Runnable example: minimal typed flow¶
from __future__ import annotations
import asyncio
from pydantic import BaseModel
from penguiflow import ModelRegistry, Node, NodePolicy, create
class In(BaseModel):
text: str
class Out(BaseModel):
upper: str
async def to_upper(msg: In, _ctx) -> Out:
return Out(upper=msg.text.upper())
async def main() -> None:
node = Node(to_upper, name="to_upper", policy=NodePolicy(validate="both"))
registry = ModelRegistry()
registry.register("to_upper", In, Out)
flow = create(node.to())
flow.run(registry=registry)
await flow.emit(In(text="hello"))
result: Out = await flow.fetch()
await flow.stop()
print(result.upper)
if __name__ == "__main__":
asyncio.run(main())
Troubleshooting checklist¶
RuntimeError: PenguiFlow already running: you calledrun()twice; create a new flow instance or stop first.RuntimeError: PenguiFlow is not running: callflow.run(...)before emitting.CycleError: remove cycles or enableallow_cycles=True(and make cycles explicit).fetch()hangs: nothing is reaching Rookery; confirm an egress node emits/returns a value.