Streaming¶
What it is / when to use it¶
PenguiFlow supports streaming partial output (for example LLM token emission) using:
StreamChunk(payload model), andContext.emit_chunk(...)(runtime helper that emits chunks asMessage(payload=StreamChunk, ...)).
Use streaming when you want:
- low-latency UI updates (SSE/WebSocket),
- long-running synthesis where partial output is valuable,
- consistent “status + chunks + final answer” contracts.
Non-goals / boundaries¶
- Streaming does not guarantee delivery ordering across different
stream_ids. - Streaming does not “replace” your final result contract; you still need a final, canonical output (often
FinalAnswer). - Streaming does not persist chunks unless you persist runtime events and/or store artifacts yourself.
Contract surface¶
Context.emit_chunk(...)¶
emit_chunk signature (core runtime):
- requires a
parent: Message - emits a
Messagewhose payload is aStreamChunk
Key behaviors:
- default
stream_idis the parent’strace_id seqis monotonically increasing perstream_id(unless you supply it explicitly)done=Trueends the stream and resets internal sequence tracking
Note
emit_chunk requires a parent Message so chunks inherit trace_id, routing headers, deadlines, and metadata.
Recommended envelope style¶
If you want streaming, use envelope-style flows:
- nodes accept and emit
Message - chunks and final answers share the same
trace_id
See Messages & envelopes.
Operational defaults¶
- Always stream to a dedicated sink node (don’t rely on side effects in the composing node).
- Use a single
trace_idto correlate chunks and the final answer. - Keep chunk sizes small; treat the stream as “progressive rendering”, not a blob transport.
Failure modes & recovery¶
No chunks appear¶
Likely causes
- your node is not receiving a
Messageenvelope (so you can’t callemit_chunk) - you aren’t routing chunks to a live consumer (sink node / UI bridge)
Fix
- switch to envelope-style (
Message) for streaming flows - add a chunk sink node and ensure it is connected in the graph
Duplicate or missing chunks¶
Likely causes
- you are emitting chunks with manual
seqvalues - multiple producers emit to the same
stream_idconcurrently
Fix
- let
Context.emit_chunkassignseq - use one producer per
stream_id(or separate stream ids)
Backpressure stalls streaming¶
Chunks are normal messages and can back up behind bounded queues.
Fix
- ensure the sink consumer is fast
- tune
queue_maxsizefor the chunk edge, and avoid unbounded queues unless you accept memory risk
Observability¶
Streaming is visible in runtime events:
- chunk sink nodes will emit
node_*events like any other node - queue depth signals tell you if chunk delivery is backing up
If you bridge to a UI, you typically also log:
stream_id,seq, and chunk sizes- time-to-first-chunk and time-to-final metrics
Security / multi-tenancy notes¶
- Never stream secrets. Streams are user-visible surfaces.
- Correlate by
Headers.tenantandtrace_idto avoid cross-tenant mixing.
Minimal end-to-end pattern¶
The simplest reliable pattern is to route chunks to a dedicated sink node:
flowchart LR
A["compose_final"] --> C["chunk_sink"]
A --> F["final_answer"]
C --> R["Rookery"]
F --> R
In practice:
- Your “compose” node receives a
Messageenvelope. - It emits
StreamChunks tochunk_sink. - It returns (or emits) a final
FinalAnsweras the canonical result.
Runnable example¶
from __future__ import annotations
import asyncio
from penguiflow import Headers, Message, Node, NodePolicy, create
from penguiflow.types import FinalAnswer
async def chunk_sink(msg: Message, _ctx) -> None:
chunk = msg.payload
print(chunk.text, end="")
if chunk.done:
print("")
async def deliver_final(msg: Message, _ctx) -> FinalAnswer:
return msg.payload
async def compose(msg: Message, ctx) -> None:
await ctx.emit_chunk(parent=msg, text="hello ", to=chunk_node)
await ctx.emit_chunk(parent=msg, text="world", done=True, to=chunk_node)
final = msg.model_copy(update={"payload": FinalAnswer(text="hello world")})
await ctx.emit(final, to=final_node)
chunk_node = Node(chunk_sink, name="chunk_sink", policy=NodePolicy(validate="none"))
final_node = Node(deliver_final, name="final", policy=NodePolicy(validate="none"))
compose_node = Node(compose, name="compose", policy=NodePolicy(validate="none"))
async def main() -> None:
flow = create(
compose_node.to(chunk_node, final_node),
chunk_node.to(),
final_node.to(),
)
flow.run()
message = Message(payload={"request": "ignored"}, headers=Headers(tenant="demo"))
await flow.emit(message, trace_id=message.trace_id)
result = await flow.fetch(trace_id=message.trace_id)
print("Final:", result.text)
await flow.stop()
if __name__ == "__main__":
asyncio.run(main())
See examples/roadmap_status_updates/flow.py for a complete, tested implementation (status + chunks + final answer).
Troubleshooting checklist¶
- Chunks print but final answer missing: ensure you emit a final result to an egress node (Rookery) and
fetch(trace_id=...). - Final answer arrives but chunks don’t: confirm chunk routing to the sink edge and that the sink is connected.
- Streaming is slow: check queue depths and sink performance; tune
queue_maxsizeand reduce chunk volume.
Further reading (internal notes)¶
docs/agui/flow-tool-calls.md(tool-call event mapping)docs/patterns/roadmap_status_updates.md(status + roadmap streaming pattern)