Playbooks (subflows)¶
What it is / when to use it¶
A playbook is a subflow you execute as a unit, typically to:
- reuse a complex pipeline (retrieve → rank → summarize),
- isolate a “mini graph” behind a single call boundary,
- enforce consistent timeouts/retries and observability around a reusable workflow.
PenguiFlow exposes call_playbook(...) to execute a playbook and return its first egress result.
Non-goals / boundaries¶
- Playbooks are not a module system; they are a runtime composition tool.
- A playbook is a separate
PenguiFlowinstance. It does not automatically share queues, middleware, or state with the parent runtime. call_playbookis a best-effort integration point; if you need a fully distributed “subflow execution service”, use remote transports + aStateStore.
Contract surface¶
Playbook factory¶
A playbook is a callable that returns:
- a
PenguiFlowinstance, and - an optional
ModelRegistryused for runtime validation.
call_playbook(...)¶
call_playbook(playbook, parent_msg, timeout=None, runtime=None):
- runs the playbook flow,
- emits
parent_msgas the ingress message, - fetches the first Rookery result,
- returns:
result.payloadif the result is aMessage, otherwise- the raw result.
Cancellation propagation:
- if you pass
runtime=<parent PenguiFlow>and the message has atrace_id, call_playbookmirrors trace cancellation from the parent runtime into the subflow.
Operational defaults (recommended)¶
- Use envelope messages (
Message) when calling playbooks so correlation, deadlines, and cancellation behave deterministically. - Keep playbooks small and purpose-built; avoid “god playbooks” that become hard to test and evolve.
- If a playbook touches external services, define explicit node-level timeouts and retry policy in the playbook itself.
Failure modes & recovery¶
- Subflow never returns: the playbook has no egress node (nothing routes to Rookery).
- Fix: ensure the playbook graph has at least one node with no outgoing edges that returns/emits a value.
- Cancellation doesn’t propagate: you did not pass
runtime=or you aren’t using trace-scoped envelopes. - Fix: pass the parent runtime into
call_playbookand ensure messages havetrace_id. - Unexpected envelope loss: playbook nodes return payloads instead of
Messagewhen you intended to preserve headers/meta. - Fix: adopt an envelope-consistent playbook style (Message-in → Message-out) or validate using
assert_preserves_message_envelope.
Observability¶
call_playbook runs a separate flow instance; attach observability to the playbook flow:
- middleware (runtime
FlowEventcapture), StateStoreevent persistence (audit).
Operational pattern:
- reuse the same middleware factory you use for parent flows (structured logging + derived metrics),
- include
trace_idin logs for correlation.
Security / multi-tenancy notes¶
- Treat playbooks like any other flow: enforce tenant boundaries at ingress (
Headers.tenant) and do not mix tenants inside a trace. - If playbooks use external tool integrations, ensure secrets are injected via env/secret manager and never logged.
Runnable examples¶
The repo contains a playbook example you can run:
uv run python examples/playbook_retrieval/flow.py
uv run python examples/routing_with_playbooks/flow.py
uv run python examples/roadmap_status_updates_subflows/flow.py
Minimal example: calling a playbook from a parent node¶
This example demonstrates the shape of playbook invocation and cancellation mirroring.
from __future__ import annotations
import asyncio
from penguiflow import Headers, Message, Node, NodePolicy, call_playbook, create
def make_subflow():
async def sub(msg: Message, _ctx) -> Message:
return msg.model_copy(update={"payload": {"subflow": True, "payload": msg.payload}})
sub_node = Node(sub, name="sub", policy=NodePolicy(validate="none"))
sub_flow = create(sub_node.to())
return sub_flow, None
class Parent:
def __init__(self) -> None:
self.runtime = None
async def run_subflow(self, msg: Message, _ctx) -> Message:
assert self.runtime is not None
result = await call_playbook(make_subflow, msg, timeout=2.0, runtime=self.runtime)
return msg.model_copy(update={"payload": {"parent": True, "sub_result": result}})
async def main() -> None:
parent = Parent()
parent_node = Node(parent.run_subflow, name="parent", policy=NodePolicy(validate="none"))
flow = create(parent_node.to())
parent.runtime = flow
flow.run()
message = Message(payload={"hello": "world"}, headers=Headers(tenant="demo"))
await flow.emit(message, trace_id=message.trace_id)
print(await flow.fetch(trace_id=message.trace_id))
await flow.stop()
if __name__ == "__main__":
asyncio.run(main())
Troubleshooting checklist¶
- If you need retries/timeouts, configure them on nodes inside the playbook (not only in the parent).
- If you need distributed pause/resume and audit history, integrate a
StateStoreand consider remote transports instead of nested subflows.