Master multi-agent orchestration with LangGraph, AutoGen teams, and OpenAI handoffs. Learn DAG-style routing, typed shared state, protocol boundaries, and human-in-the-loop controls for reliable AI systems.
Agent failure handling gives one worker a recovery plan. Multi-agent orchestration asks a different question: when should the system stop giving one agent every responsibility and split the job into typed nodes, explicit edges, persistent state, and approval points?
Multi-agent orchestration splits a hard workflow into specialized steps with explicit dependencies. This article shows when a graph makes control flow clearer than asking one model call to plan, retrieve, decide, and act alone. A graph becomes safer only when it also limits capabilities, validates merged evidence, and binds approvals to exact side effects.
Imagine a single warehouse worker who has to find your package in a sprawling fulfillment center, check the carrier tracking, verify the refund policy, and write the customer email, all while ten other orders are backing up. Even a strong worker will miss details, mix up labels, and forget which shelf they already checked. Real warehouses solve this with teams: one person picks, another packs, a third updates tracking, and a floor manager makes sure each handoff happens at the right moment.
Modern AI systems face the same bottleneck. A single LLM call can research, analyze, and write, but as tasks get complex it starts to forget details (context window saturation) and blur roles. The fix is multi-agent orchestration: specialized agents for specialized jobs, connected by an explicit workflow graph. Instead of one prompt that tries to do everything, you build a small pipeline where a triage agent classifies the request, an order-lookup agent queries the database, and a response formatter composes the reply.
This article builds on ideas you have already practiced: structured outputs, ReAct loops, human approval gates, memory, and failure recovery. If those feel fuzzy, review the prerequisite lessons first. Here, we take the next step: wiring multiple specialized agents into a graph so they collaborate on a single job without hiding state in a transcript.
AutoGen[1] and MetaGPT[2] were early demonstrations that structured roles and explicit coordination improve multi-step execution over ad-hoc multi-agent chat. The AutoGen paper reported gains across coding, math, and question answering, while MetaGPT showed that explicit standard operating procedures produce more coherent artifacts than open-ended chat. Those results moved multi-agent work from a prompt trick to a systems design problem.
Before you reach for a graph of agents, take the cost seriously. First-party engineering reports now make the boundary clearer: parallel exploration can pay off, while coupled decisions and unbounded writes make multi-agent systems harder to control.
Cognition, the team behind Devin[3], published a widely read argument titled "Don't Build Multi-Agents."[4] Their position: when you split work across parallel subagents, each subagent acts on its own slice of context. Because every action carries implicit decisions, those decisions conflict when the subagents cannot see each other's full traces. The result is a system that looks parallel but produces inconsistent, hard-to-reconcile output. Their recommended default is a single agent with strong context engineering, and where you must split work, share the complete trace rather than isolated messages.
In April 2026, Cognition updated that position with a narrower pattern they said was working in practice: multiple agents may contribute intelligence, but writes stay single-threaded.[5] That refinement is central to production design. Parallel research, retrieval, and proposal branches do not justify parallel payment, address-change, or label-generation writes.
Anthropic, almost the same week, published "How we built our multi-agent research system"[6] describing the opposite choice for their Research feature: a lead agent that spawns 3 to 5 subagents, each exploring a different part of the question in its own context window. Their internal research eval showed this setup outperformed a single Opus 4 agent by 90.2%. The catch they state plainly: multi-agent systems use about 15 times more tokens than a chat, so the task value has to justify the spend. They also name where multi-agent is a poor fit: domains where all agents must share the same context or where there are heavy dependencies between agents. They cite coding as exactly that kind of poor fit, because subtasks are tightly coupled and agents struggle to coordinate in real time.
Read together, Cognition's warning and refinement plus Anthropic's research system describe the same trade-off from different task shapes:
| Signal | Favors a single strong agent | Favors multiple agents |
|---|---|---|
| Subtask coupling | Tight, decisions depend on each other | Loose, subtasks explore independent slices |
| Context sharing | Every step needs the full trace | Each branch needs only its own slice |
| Parallelism | Little, the work is sequential | High, branches run at the same time |
| Context size | Fits one window with good engineering | Exceeds one window, needs compression |
| Task value | Routine, cost-sensitive | High value, can pay 10 to 15x token cost |
| External writes | One bounded writer | Parallel evidence only, then one controlled writer |
The practical rule for 2026: reach for multi-agent only when subtasks are genuinely separable and parallel, and when the task value justifies the token and failure-surface cost. Otherwise a single agent with disciplined context engineering is cheaper and easier to debug. The rest of this article assumes you have cleared that bar and now need to wire the graph correctly.
Here is a small admission check for that decision. Parallel branches win only when work is independent, valuable enough to fund extra execution, and able to merge through a clear contract.
1from dataclasses import dataclass
2
3@dataclass(frozen=True)
4class TaskShape:
5 independent_branches: int
6 needs_shared_trace: bool
7 value_justifies_extra_cost: bool
8 merge_contract_defined: bool
9
10def choose_orchestration(task: TaskShape) -> str:
11 if (
12 task.independent_branches >= 2
13 and not task.needs_shared_trace
14 and task.value_justifies_extra_cost
15 and task.merge_contract_defined
16 ):
17 return "parallel graph"
18 return "single scoped agent"
19
20research = TaskShape(4, False, True, True)
21code_edit = TaskShape(3, True, True, False)
22
23print("research:", choose_orchestration(research))
24print("coupled code edit:", choose_orchestration(code_edit))1research: parallel graph
2coupled code edit: single scoped agent
In the simplest case, an agent pipeline is a straight line: the user asks a question, agent A looks it up, agent B analyzes it, and agent C replies. For a narrow task like "What is the return policy?", a chain works fine. But real customer inquiries are messy. A shopper might ask, "Where is my order, and can I change the delivery address?" That touches inventory, shipping rules, and address validation at the same time. A rigid assembly line forces you to process these in sequence even when they could run in parallel.
A naive pipeline processes steps sequentially, handing output directly from one task to the next. The visual below compares that rigid path with a DAG route that can branch by intent and merge results before the final answer.
Linear chains break the moment you need:
A Directed Acyclic Graph (DAG) models your agent workflow as nodes (agents or functions) and edges (dependencies). Think of it as a flowchart that defines routing rules: "If the Triage Agent says 'shipping question', go to the Shipping Analyst. If it says 'refund request', go to the Policy Agent." A DAG gives you branching and parallelism without back-edges.
Once you add retry loops, review loops, or approval loops, you are no longer dealing with a strict DAG. You're dealing with a state machine or a more general graph. LangGraph supports both. In the DAG route above, the triage node can fan out to specialists and every successful branch still converges on the formatter without cycling.
The control flow is explicit. Given the same state and routing decisions, you can inspect why execution moved from one node to the next. That's much easier to debug than an open-ended conversation loop where an LLM silently decides the next speaker.
Explicit routing is not authorization. Keep read-only evidence branches separate from any node that can issue credits, change an address, or send an external notification. A write-capable node must still require policy checks, a bound approval when needed, and an idempotency key.
LangGraph is a graph-based framework for multi-agent orchestration. Its core primitives are a state schema, node functions, and edges.[7]
A shared TypedDict (a Python type hint that adds static typing to dictionaries) flows through the graph, serving as the memory for all agents. The code below defines a compact teaching schema for our order-tracking graph. It includes message history, the current routing step, retrieved order data, and an error count. In production, sensitive or large order records should remain in an access-controlled store; graph state should carry a scoped reference plus the verified fields downstream nodes need.
1from typing import Annotated
2from typing_extensions import TypedDict
3from langchain_core.messages import AIMessage, AnyMessage
4from langgraph.graph.message import add_messages
5
6OrderData = dict[str, str]
7
8class AgentState(TypedDict):
9 # Messages accumulate using the add_messages reducer
10 messages: Annotated[list[AnyMessage], add_messages]
11 current_step: str
12 order_data: OrderData | None
13 shipping_status: str | None
14 final_answer: str | None
15 error_count: int
16 next_worker: str | NoneKey insight: The
Annotated[list[AnyMessage], add_messages]pattern tells LangGraph to merge updates by appending rather than overwriting. If two parallel branches both append messages, you get both sets in the final list.
Nodes are standard Python functions that transform the graph's state. The functions below implement an order lookup and a shipping analysis step. Each takes the current AgentState, performs external read calls (like a database query or a carrier API call), and returns a dictionary of state updates. LangGraph merges those updates into the global state automatically.
In the snippet, order_db and carrier_api are application adapters. The complete local smoke test after the routing section uses in-memory adapters so you can run the graph without API keys.
1def lookup_node(state: AgentState) -> dict[str, object]:
2 """Order lookup agent: queries the database and returns order details."""
3 query = state["messages"][-1].content
4 # Example: using a search tool or database wrapper
5 results = order_db.search(query)
6 return {
7 "order_data": results,
8 "current_step": "shipping_check",
9 # Appends to messages list due to Annotated[list, add_messages]
10 "messages": [AIMessage(content=f"Found order {results['order_id']}")],
11 }
12
13def shipping_node(state: AgentState) -> dict[str, object]:
14 """Shipping analyst: checks carrier tracking for the order."""
15 order = state["order_data"]
16 if order is None:
17 raise RuntimeError("Missing order data")
18 tracking = carrier_api.track(order["tracking_number"])
19 return {
20 "shipping_status": tracking.status,
21 "final_answer": f"Order {order['order_id']} is {tracking.status}.",
22 "current_step": "format",
23 "messages": [AIMessage(content=f"Carrier status: {tracking.status}")],
24 }Notice that each node returns only the fields it changes. LangGraph patches those into the existing state rather than replacing the whole object. That means the order_data produced by lookup_node is still visible when shipping_node runs, even though shipping_node doesn't return order_data again.
Conditional edges define the graph's control flow based on the current state. A triage node can write a routing decision into shared state, and a small router function can read that decision and return the next node name. That keeps the classification logic inside a node while keeping the edge function deterministic and easy to inspect.
1from typing import Literal
2from langchain_core.messages import AIMessage
3from langgraph.graph import StateGraph, START, END
4
5Route = Literal["lookup_agent", "faq_agent", "policy_agent"]
6
7def triage_node(state: AgentState) -> dict[str, object]:
8 user_query = state["messages"][-1].content.lower()
9
10 if any(token in user_query for token in ("where", "track", "shipping")):
11 route: Route = "lookup_agent"
12 elif any(token in user_query for token in ("return", "refund", "policy")):
13 route = "policy_agent"
14 else:
15 route = "faq_agent"
16
17 return {"next_worker": route}
18
19def faq_node(state: AgentState) -> dict[str, object]:
20 answer = llm.invoke(
21 f"Answer briefly from the FAQ: {state['messages'][-1].content}"
22 )
23 return {"final_answer": answer.content, "current_step": "format"}
24
25def policy_node(state: AgentState) -> dict[str, object]:
26 answer = llm.invoke(
27 f"Explain the return policy for: {state['messages'][-1].content}"
28 )
29 return {"final_answer": answer.content, "current_step": "format"}
30
31def format_node(state: AgentState) -> dict[str, object]:
32 return {
33 "current_step": "complete",
34 "messages": [AIMessage(content=state["final_answer"] or "No answer generated yet.")],
35 }
36
37def router(state: AgentState) -> Route:
38 return state["next_worker"] or "faq_agent"
39
40# Build the graph
41builder = StateGraph(AgentState)
42builder.add_node("triage", triage_node)
43builder.add_node("lookup_agent", lookup_node)
44builder.add_node("faq_agent", faq_node)
45builder.add_node("policy_agent", policy_node)
46builder.add_node("shipping_analyst", shipping_node)
47builder.add_node("response_formatter", format_node)
48
49builder.add_edge(START, "triage")
50
51# Conditional edges handle dynamic routing
52builder.add_conditional_edges("triage", router)
53
54# Standard edges define fixed transitions and the final fan-in
55builder.add_edge("lookup_agent", "shipping_analyst")
56builder.add_edge("shipping_analyst", "response_formatter")
57builder.add_edge("faq_agent", "response_formatter")
58builder.add_edge("policy_agent", "response_formatter")
59builder.add_edge("response_formatter", END)
60
61app = builder.compile()If you invoke this graph with the query "Where is order 7421?", the trace looks like this:
triage_node reads the query, writes "lookup_agent" into next_worker."lookup_agent", so LangGraph calls lookup_node.lookup_node fetches order 7421, writes order_data, and sets current_step to "shipping_check".lookup_agent -> shipping_analyst triggers shipping_node.shipping_node queries the carrier, writes shipping_status, stores final_answer, and sets current_step to "format".shipping_analyst -> response_formatter triggers format_node.format_node reads final_answer and appends the final message.response_formatter -> END terminates the run.Production tip: Keep router functions deterministic. If the routing decision depends on an LLM call, perform that call inside a node (like
triage_node), write the result into state, and let the edge function read it. That way you can log the exact routing decision without re-running the LLM during debugging.
Routing also needs a capability boundary. A customer mentioning a refund may route to a policy lookup and review proposal, but it must not cause a payment write merely because a specialist node exists.
1ROUTES = {
2 "tracking": ("order_read", "carrier_read", "draft_reply"),
3 "refund": ("order_read", "policy_read", "propose_refund", "human_review"),
4}
5
6def route_for(message: str) -> tuple[str, ...]:
7 intent = "refund" if "refund" in message.lower() else "tracking"
8 return ROUTES[intent]
9
10path = route_for("Please refund order 7421")
11assert "issue_credit" not in path
12print("route:", " -> ".join(path))
13print("external write included:", "issue_credit" in path)1route: order_read -> policy_read -> propose_refund -> human_review
2external write included: FalseHere is the same graph as a complete local smoke test. It uses fake order and tracking data, but the LangGraph state, nodes, conditional edge, fixed edges, and invocation are real:
1from typing import Annotated, Literal
2from typing_extensions import TypedDict
3
4from langchain_core.messages import AIMessage, AnyMessage, HumanMessage
5from langgraph.graph import END, START, StateGraph
6from langgraph.graph.message import add_messages
7
8OrderData = dict[str, str]
9
10class AgentState(TypedDict):
11 messages: Annotated[list[AnyMessage], add_messages]
12 current_step: str
13 order_data: OrderData | None
14 shipping_status: str | None
15 final_answer: str | None
16 error_count: int
17 next_worker: str | None
18
19ORDERS = {"7421": {"order_id": "7421", "tracking_number": "1Z999"}}
20TRACKING = {"1Z999": "in_transit"}
21Route = Literal["lookup_agent", "faq_agent", "policy_agent"]
22
23def triage_node(state: AgentState) -> dict[str, object]:
24 user_query = state["messages"][-1].content.lower()
25 if any(token in user_query for token in ("where", "track", "shipping")):
26 route: Route = "lookup_agent"
27 elif any(token in user_query for token in ("return", "refund", "policy")):
28 route = "policy_agent"
29 else:
30 route = "faq_agent"
31 return {"next_worker": route}
32
33def router(state: AgentState) -> Route:
34 return state["next_worker"] or "faq_agent"
35
36def lookup_node(state: AgentState) -> dict[str, object]:
37 query = state["messages"][-1].content
38 order_id = "7421" if "7421" in query else "unknown"
39 order = ORDERS[order_id]
40 return {
41 "order_data": order,
42 "current_step": "shipping_check",
43 "messages": [AIMessage(content=f"Found order {order['order_id']}")],
44 }
45
46def shipping_node(state: AgentState) -> dict[str, object]:
47 order = state["order_data"]
48 if order is None:
49 raise ValueError("order_data must be populated before shipping_node runs")
50 tracking = TRACKING[order["tracking_number"]]
51 return {
52 "shipping_status": tracking,
53 "final_answer": f"Order {order['order_id']} is {tracking}.",
54 "current_step": "format",
55 "messages": [AIMessage(content=f"Carrier status: {tracking}")],
56 }
57
58def faq_node(state: AgentState) -> dict[str, object]:
59 return {"final_answer": "FAQ answer", "current_step": "format"}
60
61def policy_node(state: AgentState) -> dict[str, object]:
62 return {"final_answer": "Policy answer", "current_step": "format"}
63
64def format_node(state: AgentState) -> dict[str, object]:
65 return {
66 "current_step": "complete",
67 "messages": [
68 AIMessage(content=state["final_answer"] or "No answer generated yet.")
69 ],
70 }
71
72builder = StateGraph(AgentState)
73builder.add_node("triage", triage_node)
74builder.add_node("lookup_agent", lookup_node)
75builder.add_node("faq_agent", faq_node)
76builder.add_node("policy_agent", policy_node)
77builder.add_node("shipping_analyst", shipping_node)
78builder.add_node("response_formatter", format_node)
79
80builder.add_edge(START, "triage")
81builder.add_conditional_edges("triage", router)
82builder.add_edge("lookup_agent", "shipping_analyst")
83builder.add_edge("shipping_analyst", "response_formatter")
84builder.add_edge("faq_agent", "response_formatter")
85builder.add_edge("policy_agent", "response_formatter")
86builder.add_edge("response_formatter", END)
87
88app = builder.compile()
89result = app.invoke({
90 "messages": [HumanMessage(content="Where is order 7421?")],
91 "current_step": "start",
92 "order_data": None,
93 "shipping_status": None,
94 "final_answer": None,
95 "error_count": 0,
96 "next_worker": None,
97})
98
99print("shipping_status:", result["shipping_status"])
100print("final message:", result["messages"][-1].content)
101print("route completed:", result["shipping_status"] == "in_transit")1shipping_status: in_transit
2final message: Order 7421 is in_transit.
3route completed: TrueOnce you can build a single graph, the next question is how to arrange agents when the problem grows. There are four common topologies, each with different trade-offs.
One LLM acts as a manager that delegates tasks to specialized workers. That pattern works when the sub-steps aren't known in advance. In our order system, a supervisor might look at the conversation history and decide whether to call the lookup agent, the policy agent, or a human escalation path.
1from langchain_core.messages import SystemMessage
2
3def supervisor_node(state: AgentState) -> dict[str, object]:
4 """Supervisor decides which worker to invoke next."""
5 response = llm.invoke([
6 SystemMessage(content="""You are a supervisor managing these workers:
7 - order_lookup: Queries the order database
8 - shipping_analyst: Checks carrier tracking
9 - policy_agent: Explains return and refund rules
10 - response_formatter: Writes the final customer reply
11
12 Based on the conversation, decide which worker to invoke next,
13 or respond with FINISH if the task is complete."""),
14 *state["messages"]
15 ])
16 return {"messages": [response], "next_worker": response.content}Supervisors offer simple, centralized control. When a refund request turns out to need both order lookup and policy explanation, the supervisor can sequence them explicitly rather than hard-coding every combination in the graph edges.
Supervisors become a bottleneck and a single point of failure. Every step costs an extra LLM call, which adds latency and token cost. Supervisors also rely on the LLM outputting structured routing instructions (like "order_lookup"), which can be brittle with smaller models.
For complex enterprise workflows, a single supervisor often gets overwhelmed. The solution is supervisors of supervisors, forming a hierarchical tree. In a large fulfillment operation, a "Floor Manager" agent might manage an "Inbound Team" (a sub-supervisor coordinating receivers and inspectors) and an "Outbound Team" (a sub-supervisor coordinating pickers and packers).
Hierarchical structures scale for massive tasks and offer clear separation of concerns. No single manager has to remember every detail of every sub-team.
Deep hierarchies add significant latency. Handoff dilution can occur, where the original customer intent loses detail as it passes through multiple layers of managers. A request that needs only one database lookup might now take three LLM calls just to decide who does the lookup.
Agents hand off to each other directly without a long-lived supervisor. OpenAI's experimental Swarm repo[8] made this pattern easy to study, and the current OpenAI Agents SDK exposes the same idea through first-class handoffs.[9][10] Handoffs are represented to the model as transfer tools, and input filters can control what history the receiving agent sees.[10] In the example below, the triage agent doesn't solve the task itself. It selects the specialist that should take over.
1import asyncio
2from agents import Agent, Runner
3
4returns_agent = Agent(
5 name="Returns Agent",
6 handoff_description="Handles refund requests and return labels.",
7 instructions="You process returns clearly and concisely.",
8)
9
10tracking_agent = Agent(
11 name="Tracking Agent",
12 handoff_description="Handles package location and delivery updates.",
13 instructions="You look up orders and explain carrier status.",
14)
15
16triage_agent = Agent(
17 name="Triage Agent",
18 instructions="Route the customer to one specialist.",
19 handoffs=[returns_agent, tracking_agent],
20)
21
22async def main():
23 result = await Runner.run(
24 triage_agent,
25 "I need to return a pair of shoes.",
26 )
27 print(result.final_output)
28 print(f"Answered by: {result.last_agent.name}")
29
30asyncio.run(main())If the orchestrator should keep ownership of the conversation and only call specialists for sub-tasks, use specialists as tools instead of handoffs.
There's no supervisor bottleneck, agents are loosely coupled, and it's easy to add new specialists.
It's harder to debug because the flow is emergent rather than explicit. Circular handoffs (A -> B -> A) are common and harder to detect than in a DAG.
This pattern uses a parallel fan-out for "embarrassingly parallel" tasks, followed by an aggregation step. It's ideal for tasks like checking a large order from multiple perspectives (stock availability, warehouse location, carrier capacity). In the topology visual above, this is the fan-out shape: one router emits independent analyst tasks, and a reducer merges their findings into one delivery estimate.
To implement a true map-reduce pattern in current LangGraph, the cleanest option is the Send API. It lets one node emit a variable number of parallel tasks, each with its own payload, and then merge the branch outputs through reducers.[7]
1import operator
2from typing import Annotated
3from typing_extensions import TypedDict
4from langgraph.graph import StateGraph, START, END
5from langgraph.types import Send
6
7class MapReduceState(TypedDict):
8 items: list[str]
9 findings: Annotated[list[str], operator.add]
10 combined_report: str
11
12class AnalystTask(TypedDict):
13 item: str
14
15def fan_out_node(state: MapReduceState) -> dict[str, object]:
16 return {"items": ["stock", "warehouse_slot", "carrier_route"]}
17
18def analyst_node(state: AnalystTask) -> dict[str, object]:
19 item = state["item"]
20 return {"findings": [f"{item}: checked and ready"]}
21
22def route_to_analysts(state: MapReduceState) -> list[Send]:
23 return [Send("analyst", {"item": item}) for item in state["items"]]
24
25def reducer_node(state: MapReduceState) -> dict[str, object]:
26 return {"combined_report": "\n".join(state["findings"])}
27
28builder = StateGraph(MapReduceState)
29builder.add_node("fan_out", fan_out_node)
30builder.add_node("analyst", analyst_node)
31builder.add_node("reducer", reducer_node)
32
33builder.add_edge(START, "fan_out")
34builder.add_conditional_edges("fan_out", route_to_analysts, ["analyst"])
35builder.add_edge("analyst", "reducer")
36builder.add_edge("reducer", END)
37
38graph = builder.compile()
39
40result = graph.invoke({"items": [], "findings": [], "combined_report": ""})
41print(result["combined_report"])
42print("finding count:", len(result["findings"]))
43print("contains carrier route:", "carrier_route" in result["combined_report"])1stock: checked and ready
2warehouse_slot: checked and ready
3carrier_route: checked and ready
4finding count: 3
5contains carrier route: TrueOne practical detail: parallel branches should either write disjoint state keys or use reducers for shared keys such as Annotated[list[str], operator.add]. Otherwise the runtime has no safe way to merge their updates.
The reducer is also a trust boundary. It should admit only findings for the current tenant and request, with recorded evidence, before any formatter or decision node consumes them.
1from dataclasses import dataclass
2
3@dataclass(frozen=True)
4class Finding:
5 tenant_id: str
6 claim: str
7 source_ref: str
8 verified: bool
9
10def reduce_findings(findings: list[Finding], tenant_id: str) -> list[str]:
11 accepted: list[str] = []
12 for finding in findings:
13 if finding.tenant_id != tenant_id:
14 raise ValueError("tenant mismatch")
15 if not finding.verified or not finding.source_ref:
16 raise ValueError("missing verified evidence")
17 accepted.append(finding.claim)
18 return accepted
19
20safe = [Finding("shop-7", "carrier status: in transit", "carrier:evt-81", True)]
21foreign = [Finding("shop-8", "refund eligible", "orders:92", True)]
22
23print("accepted:", reduce_findings(safe, "shop-7"))
24try:
25 reduce_findings(safe + foreign, "shop-7")
26except ValueError as exc:
27 print("blocked merge:", exc)1accepted: ['carrier status: in transit']
2blocked merge: tenant mismatchReducers must also reject incompatible scalar writes. Two branches may append independent findings; they should not silently choose different delivery addresses or refund decisions.
1def merge_updates(updates: list[dict[str, object]]) -> dict[str, object]:
2 merged_findings: list[str] = []
3 proposed_address: str | None = None
4 for update in updates:
5 merged_findings.extend(update.get("findings", []))
6 if "proposed_address" in update:
7 candidate = str(update["proposed_address"])
8 if proposed_address is not None and candidate != proposed_address:
9 raise ValueError("conflicting address proposals")
10 proposed_address = candidate
11 return {"findings": merged_findings, "proposed_address": proposed_address}
12
13parallel_reads = [{"findings": ["order verified"]}, {"findings": ["policy found"]}]
14print("findings:", merge_updates(parallel_reads)["findings"])
15try:
16 merge_updates([{"proposed_address": "12 Main St"}, {"proposed_address": "19 Oak St"}])
17except ValueError as exc:
18 print("blocked write merge:", exc)1findings: ['order verified', 'policy found']
2blocked write merge: conflicting address proposalsIn production, agents crash, APIs time out, and servers restart. You can't rely on in-memory state. LangGraph's InMemorySaver is convenient for local development, but production deployments usually use a durable checkpointer such as Postgres. The current docs use PostgresSaver.from_conn_string(...); one common pattern is wrapping it in a context manager so the runtime can persist each step and resume by thread_id after a crash.[11]
1from langgraph.checkpoint.postgres import PostgresSaver
2
3DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"
4
5with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
6 # Run once when initializing the checkpoint tables
7 # checkpointer.setup()
8
9 app = graph.compile(checkpointer=checkpointer)
10
11 config = {"configurable": {"thread_id": "user-123-conversation-456"}}
12 result = app.invoke(initial_state, config=config)Production tip: Always use a persistent checkpointer in production. Reusing the same
thread_idlets the runtime resume from the latest checkpoint and inspect prior state snapshots during debugging.
One more durability rule matters in production: resumed runs replay from a checkpoint boundary instead of jumping back to the exact Python line where execution paused. That means side effects and non-deterministic work should either live in LangGraph tasks or be made idempotent.[11]
The storage boundary, not the model, must enforce idempotency. If the execution node is replayed after a timeout, the same key retrieves the first result rather than issuing a second credit.
1processed_credits: dict[str, str] = {}
2
3def issue_credit(idempotency_key: str, cents: int) -> str:
4 if idempotency_key in processed_credits:
5 return f"reused {processed_credits[idempotency_key]}"
6 credit_id = f"credit-{len(processed_credits) + 1}"
7 processed_credits[idempotency_key] = credit_id
8 return f"created {credit_id} for {cents} cents"
9
10key = "refund:order-7421:v3"
11print("first run:", issue_credit(key, 2499))
12print("resumed run:", issue_credit(key, 2499))
13print("credits issued:", len(processed_credits))1first run: created credit-1 for 2499 cents
2resumed run: reused credit-1
3credits issued: 1Autonomous agents are rarely fully trusted in production. You need breakpoints where a human can approve, modify, or reject an agent's plan. A refund over $500, a delivery address change, or a cancellation that affects a bulk order are all cases where the graph should pause and wait. Approval must apply to one exact proposed action, not to the conversation in general.
LangGraph supports this natively through interrupt(), backed by a checkpointer.[12] Static interrupt_before / interrupt_after breakpoints still exist, but the current docs position them as debugging aids rather than production approval flows. In a production HITL flow, the node emits a structured approval request, the caller sees that payload under __interrupt__, and the graph resumes with Command(resume=...).
1import hashlib
2import json
3from typing import Literal
4from langgraph.types import Command, interrupt
5
6def action_digest(action: dict[str, object]) -> str:
7 encoded = json.dumps(action, sort_keys=True, separators=(",", ":")).encode()
8 return hashlib.sha256(encoded).hexdigest()
9
10def approval_node(state: dict[str, object]) -> Command[Literal["execute", "cancel"]]:
11 action = state["proposed_action"]
12 digest = action_digest(action)
13 decision = interrupt({
14 "question": "Approve this refund?",
15 "action": action,
16 "action_digest": digest,
17 "order_version": state["order_version"],
18 "idempotency_key": state["idempotency_key"],
19 "evidence_refs": state["evidence_refs"],
20 })
21 approved = (
22 decision.get("approved") is True
23 and decision.get("action_digest") == digest
24 and decision.get("order_version") == state["order_version"]
25 )
26 return Command(goto="execute" if approved else "cancel")
27
28app = graph.compile(checkpointer=checkpointer)
29config = {"configurable": {"thread_id": "refund-review-123"}}
30
31# First call pauses and exposes the interrupt payload to the caller
32paused = app.invoke(inputs, config=config)
33print(paused["__interrupt__"])
34
35# The UI resumes with the exact displayed digest and order version
36decision = {
37 "approved": True,
38 "action_digest": displayed_digest,
39 "order_version": displayed_order_version,
40}
41app.invoke(Command(resume=decision), config=config)The downstream execute node must re-read the authoritative order version immediately before writing, require another review if it changed, and send the same idempotency key to the payment service. One subtle runtime detail matters a lot in production: after resuming, LangGraph restarts the node from the top rather than continuing from the exact line of the interrupt(). Any side effects before the pause need to be idempotent or moved after the interrupt.
1import hashlib
2import json
3
4def proposal(amount_cents: int, order_version: int) -> dict[str, object]:
5 action = {"kind": "refund", "order_id": "7421", "amount_cents": amount_cents}
6 digest = hashlib.sha256(json.dumps(action, sort_keys=True).encode()).hexdigest()
7 return {"action": action, "digest": digest, "order_version": order_version}
8
9def can_execute(packet: dict[str, object], decision: dict[str, object], live_version: int) -> bool:
10 return (
11 decision.get("approved") is True
12 and decision.get("digest") == packet["digest"]
13 and decision.get("order_version") == packet["order_version"] == live_version
14 )
15
16reviewed = proposal(2499, 3)
17decision = {"approved": True, "digest": reviewed["digest"], "order_version": 3}
18changed_amount = proposal(3499, 3)
19
20print("reviewed action admitted:", can_execute(reviewed, decision, live_version=3))
21print("changed amount admitted:", can_execute(changed_amount, decision, live_version=3))
22print("stale order admitted:", can_execute(reviewed, decision, live_version=4))1reviewed action admitted: True
2changed amount admitted: False
3stale order admitted: False
When orchestrating multiple agents, the choice of coordination contract shapes the whole architecture. In practice, three common patterns appear repeatedly: typed shared state, message-oriented coordination, and supervisor-based routing.
In a shared state model (used by LangGraph), a single typed object acts as a whiteboard. Every node reads from and writes to this schema. That makes intermediate variables explicit: if an early node fetches an order record, a node deep in the graph can still access it without repackaging it into a chat message. The downside is that the schema needs discipline. As the graph grows, the state can become bloated, and careless updates can overwrite important fields.
In a message-oriented model (common in AutoGen AgentChat teams), the conversation transcript becomes the main contract between agents. Agents coordinate by reading prior messages and producing new ones, which feels natural for delegation and review loops. The trade-off is weaker typing: the transcript can grow quickly, and downstream agents only know what the conversation history includes.
In a supervisor model, a central router or manager decides which specialist agent should act next and tracks overall progress. Use this when you want explicit control over delegation and completion criteria, but expect a central bottleneck and another policy layer to debug.
| Approach | Framework | Pros | Cons |
|---|---|---|---|
| Shared State | LangGraph | Typed contracts, easy inspection of intermediates, clean checkpointing. | Schema discipline required, state can bloat over time. |
| Message-Oriented Teams | AutoGen | Natural delegation loops, flexible roles, transcript is easy to inspect. | Weaker typing, prompt history grows, context only exists if the transcript carries it. |
| Supervisor Routing | CrewAI / custom orchestrators | Clear delegation control, centralized progress tracking, easy policy insertion. | Coordinator can become a latency bottleneck and single point of failure. |
Shared state and message passing solve coordination inside one runtime. Production systems often need something else: interoperability between independent systems. Two standards matter here, but they solve different problems.
MCP (Model Context Protocol)[13], introduced by Anthropic in late 2024, standardizes how an assistant connects to tools, resources, and prompts exposed by an MCP server. It's best thought of as a tool and context protocol, not a full remote-agent delegation protocol.
A2A (Agent-to-Agent Protocol)[14][15], announced by Google in April 2025 and now developed as a broader protocol community standard, enables direct communication between autonomous agents from different providers. A2A focuses on task delegation, status updates, and result sharing between independent agent systems. This lets one vendor's planner agent delegate work to another vendor's specialist agent without flattening that specialist into a single tool call.
The two protocols solve different layers of the agent interoperability problem. MCP is a local context and capability protocol: it lets one host discover and invoke tools, read resources, and reuse prompts from any compliant server running in the same process or over stdio/HTTP. A2A is a remote delegation protocol: it lets one autonomous agent hand off a complete sub-task to another independent agent (possibly from a different company), poll for progress, and receive structured results or intermediate state.
Without this distinction, teams often end up wrapping every remote capability as an ad-hoc tool call and then wonder why long-running workflows, approvals, and status updates become awkward to implement.
| Concern | MCP (tool/context protocol) | A2A (agent delegation protocol) |
|---|---|---|
| Tool or data access | Strong fit (native) | Not the primary goal |
| Remote specialist agent | Usually wrapped as a tool call | Strong fit (first-class task handoff) |
| Long-running task status | Tool-specific custom polling logic | Built into the protocol (task objects + updates) |
| Cross-vendor interoperability | Tool and data format interoperability | Full agent-to-agent task delegation |
| Human-in-the-loop integration | Handled by the calling host | Can be modeled as explicit review tasks |
In practice, many production systems use both layers together: MCP to give each agent rich local tools and context, and A2A when one agent's planner needs to delegate work to a separate specialist agent running in another environment or organization. The references at the end of this article link to the current protocol specs and community discussions.
Microsoft's AutoGen[1] popularized a conversation-based multi-agent pattern where agents communicate through messages. That design space overlaps with work like ChatDev[16], where agents role-play as software developers in a collaborative environment. In current AgentChat releases, chat-style teams such as SelectorGroupChat, RoundRobinGroupChat, and Swarm coexist with directed workflows through GraphFlow.[17]
For the team abstractions below, the main contract is still the shared conversation history plus team policies such as speaker selection and termination conditions. The implementation below focuses on SelectorGroupChat, which is the clearest example of message-oriented coordination.
This snippet requires autogen-agentchat, autogen-ext[openai], and OpenAI credentials. It constructs a team; running it calls the configured model.
1from autogen_agentchat.agents import AssistantAgent
2from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
3from autogen_agentchat.teams import SelectorGroupChat
4from autogen_ext.models.openai import OpenAIChatCompletionClient
5
6model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
7
8# Define specialized agents
9researcher = AssistantAgent(
10 "Researcher",
11 description="Finds relevant facts and sources",
12 model_client=model_client,
13 system_message="You research topics using web search.",
14)
15
16analyst = AssistantAgent(
17 "Analyst",
18 description="Synthesizes research into a concise answer",
19 model_client=model_client,
20 system_message="You analyze research findings and end with TERMINATE when done.",
21)
22
23termination = TextMentionTermination("TERMINATE") | MaxMessageTermination(max_messages=12)
24
25# SelectorGroupChat uses the model to pick the next speaker
26team = SelectorGroupChat(
27 [researcher, analyst],
28 model_client=model_client,
29 termination_condition=termination,
30)If you need explicit graph routing inside AutoGen instead of model-selected turn-taking, use GraphFlow rather than a selector-based team.
Choosing the right orchestration framework depends heavily on the use case. While all three frameworks support multi-agent patterns, they prioritize different approaches: LangGraph enforces strict graph-based execution, AutoGen spans conversation-centric teams and GraphFlow workflows, and MetaGPT structures agents around rigid software engineering roles.
| Feature | LangGraph | AutoGen | MetaGPT |
|---|---|---|---|
| Control Flow | Explicit Graph / State Machine | Conversation Teams or directed GraphFlow | SOPs (Standard Operating Procedures) |
| State | Typed Shared State | Shared Conversation History, optionally constrained by flow edges | Shared Environment / Artifacts |
| Best For | Structured, engineering-heavy workflows | Conversation teams, handoffs, or graph workflows inside AgentChat | Software dev, strictly defined roles |
| Complexity | High (Write code for every node) | Medium to High (team presets plus GraphFlow) | High (Class-based definitions) |
Use LangGraph when you need strict control over routing, retries, checkpoints, and approvals. Use AutoGen when a conversation-centric team abstraction is the most natural fit or when GraphFlow gives you enough structure without dropping to a lower-level state machine. Use MetaGPT when you want rigid software roles and SOP-driven artifact generation.
Multi-agent orchestration is useful when it separates independent reads and keeps external writes behind one controlled execution boundary. Here are three plausible production designs, not claims about deployed performance.
A coordinator routes independent inventory and carrier lookups in parallel, then merges their source-backed findings. A permitted fulfillment service, not a free-form specialist, generates a label only after inventory and routing checks satisfy policy. The graph can draft a shipment confirmation or send an exception to review without giving every worker write authority.
When a customer requests a return, independent branches can retrieve purchase eligibility, the applicable return policy, and approved risk signals scoped to the customer and tenant. A reducer validates sources and produces a proposed action. If policy requires approval, the reviewer sees the exact amount, evidence references, and action digest; a payment service issues credit only after revalidation with an idempotency key.
When a carrier reports a failed delivery, a triage node classifies the exception. Address validation can produce a correction proposal, while a notification branch drafts a message. Changing an address or scheduling a new attempt remains an authorized action bound to verified order state. After a bounded number of failed proposals, the graph escalates with evidence instead of continuing indefinitely.
After this article, you should be able to defend these design choices in a system review or interview:
| Skill | What good looks like |
|---|---|
| When to go multi-agent | Argue, with the Cognition and Anthropic positions, why multi-agent fits separable parallel work but hurts tightly coupled tasks given the 10 to 15x token cost. |
| Chain limits | Explain why linear agent chains fail for complex workflows with conditional routing, parallel work, or cycles. |
| Graph shape | Distinguish DAG-style workflows from cyclic state-machine orchestration. |
| LangGraph implementation | Build a StateGraph with typed state, nodes, reducers, fixed edges, and conditional edges. |
| Team topology | Compare supervisor, hierarchical, swarm, and map-reduce patterns by control, latency, and debuggability. |
| Coordination contract | Explain typed shared state versus message-oriented coordination. |
| Protocol boundary | Distinguish MCP tool/context access from A2A remote-agent delegation. |
| Production control | Add checkpointing, termination guards, evidence validation, and approvals bound to exact risky actions. |
How do you handle infinite loops in a cyclic agent graph? Once you allow retries or review loops, you are no longer dealing with a strict DAG. LangGraph can model cyclic graphs and state machines, but production systems need hard termination through recursion_limit, step_count, max_turns, max retries, deadlines, or a forced transition to failure handling or human review.
When would you choose a supervisor pattern over a flat swarm pattern? Choose a supervisor when you need centralized control, strict process adherence, policy insertion, or arbitration between workers with overlapping capabilities. A swarm or peer-to-peer handoff pattern fits loosely coupled specialists, but it is harder to debug because routing decisions are decentralized.
Why is typed shared state often easier than message passing for structured workflows? It is not universally better, but it gives downstream nodes reliable fields such as order_data, approval_status, policy_result, and error_count without asking them to infer state from a growing transcript. Message-oriented teams can work well, but they need stronger serialization, summaries, and termination policies as history grows.
How does human-in-the-loop approval fit into graph architecture? Model it as an explicit approval node. In LangGraph, that node can call interrupt() to emit a structured review payload and pause execution through a checkpointer. The payload should contain the proposed action digest, evidence, source version, and idempotency key. After a human decision resumes the workflow with Command(resume=...), the execution node still revalidates authoritative state before writing.
| Symptom | Cause | Fix |
|---|---|---|
| Multi-agent prototype is slower and worse than one good prompt. | Graph was split into too many tiny agents with no real parallelism or separation of concerns. | Collapse adjacent steps, keep one agent for tightly coupled reasoning, and add workers only where roles or tools truly differ. |
| Routing burns tokens before useful work starts. | Every handoff or branch decision uses a large model, even for rules you can write down. | Use deterministic routing for explicit rules, and reserve model-based routing for real language ambiguity. |
| Retry loop never exits cleanly. | Cyclic graph has no hard guard such as recursion_limit, retry budget, or deadline. | Add termination counters in state and force failure handling or human review when the limit is hit. |
| State grows until every node becomes expensive. | Large records, raw tool payloads, or full transcripts are copied into shared state. | Store big objects by ID, summarize long histories, and keep only fields downstream nodes actually read. |
| Downstream worker breaks after an upstream wording change. | Agents are handing off prose instead of typed state fields or schema-constrained outputs. | Put structured contracts between workers: typed state keys, validated JSON, or reducer-backed message formats. |
| Approved refund changes before execution. | Approval was recorded as a Boolean instead of being bound to an action digest and source version. | Revalidate the exact proposal at execution time and issue the write through an idempotency key. |
| Remote integrations are awkward and brittle. | MCP tool access and A2A delegation were treated as interchangeable. | Use MCP for local tool and context access, and A2A when one independent agent system must delegate to another. |
If you allow cyclic graphs (for example, "if the tracking lookup fails, try again with a different query"), you must enforce termination. Common guards include:
recursion_limit in the graph invoke or stream config.step_count or error_count field in state that increments each retry.max_turns guard in conversation loops that forces a transition to failure handling or human review.Without one of these, two agents can pass a task back and forth forever.
1def lookup_with_budget(statuses: list[str], max_attempts: int) -> str:
2 for attempt, status in enumerate(statuses[:max_attempts], start=1):
3 print(f"attempt {attempt}: {status}")
4 if status == "verified":
5 return "continue to formatter"
6 return "escalate for review"
7
8outcome = lookup_with_budget(["not_found", "timeout", "not_found"], max_attempts=2)
9print("outcome:", outcome)1attempt 1: not_found
2attempt 2: timeout
3outcome: escalate for reviewA frequent failure mode is passing the entire message history to every agent. If the conversation has twenty turns, every node sees all twenty even when it only needs the latest order ID. Fixes include:
messages list for the transcript but adding dedicated state keys (like order_data) for data that downstream nodes need directly.The state can expose a minimal verified projection while keeping sensitive record content behind the application authorization boundary:
1order_store = {
2 "orders/shop-7/7421": {
3 "email": "[email protected]",
4 "status": "in_transit",
5 "tracking_number": "1Z999",
6 }
7}
8
9def project_for_graph(order_ref: str, tenant_id: str) -> dict[str, str]:
10 if not order_ref.startswith(f"orders/{tenant_id}/"):
11 raise PermissionError("cross-tenant reference")
12 record = order_store[order_ref]
13 return {"order_ref": order_ref, "verified_status": record["status"]}
14
15state_update = project_for_graph("orders/shop-7/7421", "shop-7")
16print("state update:", state_update)
17print("contains email:", "email" in state_update)1state update: {'order_ref': 'orders/shop-7/7421', 'verified_status': 'in_transit'}
2contains email: FalseUsing an LLM to decide the next step when a simple if/else or regex would suffice increases cost and reduces reliability. A good rule: use deterministic routing for rules you can write down, and use LLM-based routing only when the classification genuinely requires language understanding.
To check your understanding, try this small design exercise:
Scenario: A customer messages your store saying, "I ordered a blue shirt last week but received a red one instead. I also want to know if I can exchange it for a different size."
Task: Sketch a LangGraph StateGraph with at least three nodes and one conditional edge. Define the TypedDict state, name the nodes, and write the routing logic. Then answer these questions:
interrupt() if the refund amount exceeds $100?order_ref, minimal verified order fields, policy_result, customer_message, proposed_action, approval_status, and final_answer.State schema defines the contract between agents. Keep it strict, typed, tenant-scoped, and limited to necessary evidence or references.This article closes the Advanced Agents and Retrieval section. You started with single agents and orchestration basics; you now know how to wire specialized agents into explicit graphs, choose a topology from the workflow shape, share typed state, persist progress, gate exact risky actions with bound approval, and decide when not to build a multi-agent system at all.
Next: Continue to Inference: TTFT, TPS & KV Cache, the start of the Inference and Production Scale section. Multi-agent paths can multiply model calls per user interaction, and the Anthropic research system above reported roughly 15 times the tokens of a single chat. The next article explains the latency (TTFT), throughput (TPS), and memory (KV cache) realities that decide whether your agent graphs can run fast and affordably at production scale.
AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation.
Wu, Q., et al. · 2023
MetaGPT: Meta Programming for A Multi-Agent Collaborative Framework.
Hong, S., et al. · 2023
Introducing Devin, the first AI software engineer
Cognition Labs · 2024
Don't Build Multi-Agents
Yan, W. (Cognition) · 2025
Multi-Agents: What's Actually Working
Yan, W. (Cognition) · 2026
How we built our multi-agent research system
Hadfield, J., Zhang, B., Lien, K., et al. (Anthropic) · 2025
LangGraph Graph API
LangChain · 2026
Swarm: Educational Framework for Multi-Agent Orchestration.
OpenAI · 2024
OpenAI Agents SDK
OpenAI · 2025
OpenAI Agents SDK Handoffs
OpenAI · 2026
LangGraph Persistence
LangChain · 2026
LangGraph Interrupts
LangChain · 2024
Introducing the Model Context Protocol
Anthropic · 2024
A2A Protocol Specification
A2A Project · 2025
A2A Protocol Ships v1.0: Production-Ready Standard for Agent-to-Agent Communication
A2A Protocol Community · 2026
Communicative Agents for Software Development.
Qian, C., et al. · 2023
GraphFlow (Workflows)
Microsoft AutoGen · 2026