Concepts¶
What it is / when to use it¶
This page defines the major moving parts of PenguiFlow and how they fit together.
Use it when you want to:
- pick the right “message style” (payload-only vs envelopes),
- understand where reliability and backpressure come from,
- decide when you need the planner vs the core runtime.
If you want runnable code first, start with Quickstart.
Non-goals / boundaries¶
- This page is not a full API reference. It focuses on the mental model and the contracts that matter.
- It does not cover external tool configuration and auth (see Tooling and Tools & integrations).
- It does not prescribe an application architecture; “planner-only”, “runtime-only”, and “mixed” are all valid.
Contract surface (the pieces you compose)¶
Flow (runtime)¶
A flow is a directed graph of async nodes. It provides:
flow.run(...)/await flow.stop()lifecycleawait flow.emit(...)ingress (OpenSea)await flow.fetch(...)egress (Rookery)- optional per-trace cancellation:
await flow.cancel(trace_id)
Canonical: Flows & nodes.
Node (runtime)¶
A node is an async function wrapped with metadata and policy:
- signature:
async def fn(message, ctx) -> Any NodePolicy: timeouts, retries, validation- edges:
a.to(b, c)creates bounded queues (backpressure)
Canonical: Errors, retries, timeouts and Concurrency.
Message (data + metadata)¶
PenguiFlow supports two message styles:
- Payload-only (fastest start): nodes receive and return plain Pydantic models.
- Envelope-based (recommended for production): nodes pass a
Message(payload=..., headers=..., trace_id=...), enabling: - per-trace correlation (
trace_id), - per-trace cancellation,
- deadlines,
- streaming chunks that inherit routing metadata,
- multi-tenant isolation via
Headers.tenant.
Canonical: Messages & envelopes.
Context (in-node capabilities)¶
Every node receives a ctx that can:
- emit follow-up work (
await ctx.emit(...)), - emit streaming chunks (
await ctx.emit_chunk(...)), - access trace-scoped metadata used for observability and control (cancellation / deadlines).
Canonical: Streaming and Cancellation.
Planner (ReactPlanner)¶
The planner is an LLM-driven loop that selects tools and orchestrates their execution (including parallel calls) with:
- typed action schema and repair attempts,
- pause/resume (HITL) and session semantics,
- optional short-term memory,
- trajectory logging/observability hooks.
Canonical: ReactPlanner overview.
Operational defaults (safe starting points)¶
- Prefer bounded queues (
queue_maxsize> 0) and treat queue depth as a first-class signal. - Prefer envelopes (
Message) when you need streaming, cancellation, deadlines, multi-tenant boundaries, or deterministic correlation. - Keep
trace_idunique per request/session; treat it as part of your authorization story (don’t let a user fetch/cancel another user’s trace). - In production, add:
- middlewares for structured
FlowEventlogging, and/or - a
StateStorefor durability and event persistence.
Failure modes & recovery¶
fetch()hangs: nothing reached the Rookery sink (no egress node, egress returnsNone, or you didn’t callrun()).- Cross-trace mixups: you reused
trace_idacross concurrent requests (use trace-scoped fetch or unique trace ids). - Streaming “does nothing”: you’re using payload-only messages; switch to envelope style and call
ctx.emit_chunk(parent=Message(...), ...). - Retries amplify side effects: the node is not idempotent (use idempotency keys, or emit side effects only once).
Canonical runbooks live in the core pages:
Observability¶
The runtime emits structured FlowEvent for:
- node lifecycle (
node_start,node_success,node_error,node_timeout, …), - queue depth and pending/inflight counts (critical for backpressure debugging),
- trace cancellation and deadline skips.
Operationally:
- attach a middleware (e.g.
penguiflow.middlewares.log_flow_events) early, and - decide where you persist events (often via
StateStore) before production rollout.
See Logging and Telemetry patterns.
Security / multi-tenancy notes¶
- Always set
Headers.tenantwhen you use envelopes, and keep tenant boundaries consistent across a trace. - Don’t put secrets in payloads or message
metaif you persist events/logs; prefer secret managers + redaction. - Treat
trace_id+fetch(trace_id=...)+cancel(trace_id)as sensitive control surfaces in applications.
Runnable examples¶
Run a minimal flow:
uv run python examples/quickstart/flow.py
Run a streaming example (chunks + final answer):
uv run python examples/roadmap_status_updates/flow.py
If you’re building an LLM agent, start with the planner template:
uv run penguiflow new my-agent --template react
uv run penguiflow dev --project-root my-agent
Troubleshooting checklist¶
- Need cancellation/deadlines/streaming: switch to envelopes (
Message) and usetrace_idper request. - Need parallel fan-out + join: use
join_kand ensure you passtrace_id(see Concurrency). - Need pause/resume / HITL: use the planner (see Pause/resume).
- Need tool integrations: use ToolNode and configure auth (see Tools configuration).