Building Durable AI Agents with Temporal — Crash-Proof Long-Running Workflows

The bottom line: Every AI agent built today will crash. Network timeouts, API rate limits, pod restarts, and deployment rollouts all terminate in-flight agent runs silently — and an agent that loses state mid-task isn’t production-ready. Temporal’s durable execution model gives your agent the same guarantee as a database transaction: it either completes fully, or replays from the last checkpoint automatically. This guide walks through five practical patterns with working code.
Why Durable Execution Matters for AI Agents
A typical agent workflow involves multiple LLM calls, tool invocations, and state transitions over seconds to minutes. Here’s what happens when any of those steps fails without durability:
| Failure event | Without durability | With durable execution |
|---|---|---|
| LLM API timeout (10% of calls [1]) | Agent loses partial progress, user retries | Worker restarts from last activity |
| Pod killed during scale-down | Entire agent state deleted | Workflow resumes on new worker |
| Code deploy mid-agent-run | Process terminated, state lost | New code picks up at checkpoint |
| Database connection drop | Tool call fails, no retry | Activity retries with backoff |
| 30-minute research task restart | Must redo all LLM calls | Replays from cached results |
Durable execution achieves this through event sourcing: every workflow step writes its result to a history store before proceeding. When a crash occurs, the worker reads the event log and replays deterministic computation — LLM calls are cached from the first execution, so you don’t pay for them again [2].
[1] Temporal, “Durable Execution for AI Agents” — temporal.io/ai [2] OpenTelemetry GenAI Semantic Conventions, “Agent Tracing with Durable Execution” — opentelemetry.io/docs/specs/semconv/gen-ai/
What You’re Building
A multi-step research agent that:
- Receives a research query and retrieves web sources
- Calls an LLM to synthesize findings
- Generates a structured report
- Sends the result back to the caller
The agent runs inside a Temporal workflow, which means it survives any infrastructure failure between steps. If the worker crashes during LLM call #2, it resumes from exactly that point once a new worker picks up the task.
The full code is available in patterns you can adapt to any LangGraph, OpenAI Agents SDK, or custom agent implementation.
Prerequisites
- Python 3.10+
- Docker (for local Temporal Server), or a Temporal Cloud account
- A Temporal Server running locally:
temporal server start-dev - OpenAI API key (or any LLM provider supported by your agent framework)
- Temporal Python SDK:
pip install temporalio
Step 1: Setting Up Temporal
Start a local Temporal Server for development:
temporal server start-dev \
--db-filename /tmp/temporal.db \
--namespace default
This runs an in-memory Temporal Server with SQLite persistence. For production, you’d run Temporal Server in Docker or use Temporal Cloud.
Verify it’s running:
temporal operator namespace list
# Expected: default (active)
Install the Python SDK:
pip install temporalio
For LangGraph integration, you’ll also need:
pip install langgraph langchain-openai
Step 2: Defining the Workflow-Activity Boundary
The fundamental pattern in Temporal is Workflows call Activities. Workflows are deterministic — they can’t make network calls directly. Activities are non-deterministic — they’re where LLM calls, API requests, and I/O live.
# activities.py — Non-deterministic operations
from temporalio import activity
@activity.defn
async def search_web(query: str) -> list[dict]:
"""Activity: make an HTTP call. Safe to retry."""
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(
"https://api.duckduckgo.com",
params={"q": query, "format": "json"},
timeout=15,
)
resp.raise_for_status()
return resp.json().get("results", [])
@activity.defn
async def call_llm(prompt: str, system: str) -> str:
"""Activity: LLM call. Retries on timeout, cached on replay."""
from openai import AsyncOpenAI
client = AsyncOpenAI()
resp = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": prompt},
],
timeout=30,
)
return resp.choices[0].message.content or ""
@activity.defn
async def generate_report(synthesis: str, format: str = "markdown") -> str:
"""Activity: format the final output."""
return f"# Research Report\n\n{synthesis}\n\n---\n*Generated by durable agent*"
Key rule: Activities can be retried independently. Temporal replays the workflow up to the last completed activity, then re-executes only the failed one [3].
Step 3: Writing the Durable Workflow
The workflow is the orchestration logic — it’s deterministic and must produce the same output given the same inputs, so Temporal can replay it safely.
# workflows.py — Deterministic orchestration
from datetime import timedelta
from temporalio import workflow
# Import activities — workflow can't import them directly at runtime
# (they're injected by the worker)
with workflow.unsafe.imports_passed_through():
from activities import search_web, call_llm, generate_report
@workflow.defn
class ResearchAgentWorkflow:
@workflow.run
async def run(self, query: str) -> str:
# Step 1: Search — this is an Activity (network call)
results = await workflow.execute_activity(
search_web,
query,
start_to_close_timeout=timedelta(seconds=20),
retry_policy=workflow.RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
maximum_attempts=3,
),
)
# Step 2: Synthesize — another Activity (LLM call)
synthesis = await workflow.execute_activity(
call_llm,
args=[
f"Sources: {results}\n\nSummarize findings for: {query}",
"You are a research assistant. Synthesize sources into concise findings.",
],
start_to_close_timeout=timedelta(seconds=60),
retry_policy=workflow.RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_attempts=5,
),
)
# Step 3: Generate report — final Activity
report = await workflow.execute_activity(
generate_report,
synthesis,
start_to_close_timeout=timedelta(seconds=10),
)
return report
Notice each execute_activity call has explicit timeouts and retry policies. Temporal won’t mark the activity as failed until the timeout fires, and will automatically retry according to the policy [3].
[3] Temporal Python SDK Docs, “Activity Execution” — docs.temporal.io/develop/python/activity-execution
Step 4: Running the Worker
The worker connects to Temporal Server and polls for workflow tasks:
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows import ResearchAgentWorkflow
from activities import search_web, call_llm, generate_report
async def main():
client = await Client.connect("localhost:7233", namespace="default")
worker = Worker(
client,
task_queue="research-agent-queue",
workflows=[ResearchAgentWorkflow],
activities=[search_web, call_llm, generate_report],
)
print("Worker started — waiting for workflow tasks...")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Start the worker in one terminal:
python worker.py
In another terminal, start a workflow:
# client.py
import asyncio
from temporalio.client import Client
from workflows import ResearchAgentWorkflow
async def main():
client = await Client.connect("localhost:7233", namespace="default")
handle = await client.start_workflow(
ResearchAgentWorkflow.run,
"LLM agent orchestration patterns in production 2026",
id="research-001",
task_queue="research-agent-queue",
)
result = await handle.result()
print(f"Report:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
python client.py
Test durability: Kill the worker process mid-execution, then restart it. The workflow resumes from the last completed activity — no data loss, no duplicate LLM calls.
Step 5: Wrapping LangGraph in a Temporal Activity
If you already have a LangGraph agent, you don’t need to rewrite it. Wrap the entire graph execution in a single Temporal activity:
# langgraph_activity.py
from temporalio import activity
from langgraph.graph import StateGraph, MessagesState
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
@activity.defn
async def run_langgraph_agent(query: str) -> dict:
"""Wraps an entire LangGraph agent run as a single Temporal activity."""
model = ChatOpenAI(model="gpt-4o", temperature=0)
# Build a simple graph
workflow = StateGraph(MessagesState)
async def call_model(state: MessagesState):
response = await model.ainvoke(state["messages"])
return {"messages": [response]}
workflow.add_node("agent", call_model)
workflow.set_entry_point("agent")
graph = workflow.compile(checkpointer=MemorySaver())
# Run with config
config = {"configurable": {"thread_id": f"langgraph-{activity.info().activity_id}"}}
result = await graph.ainvoke(
{"messages": [("user", query)]},
config,
)
return {"result": result["messages"][-1].content}
This pattern gives you Temporal’s durability for free — the LangGraph activity becomes a single atomic step in the larger workflow. If Temporal crashes mid-graph, the entire activity retries from scratch, but Temporal’s caching ensures you don’t lose the workflow’s overall progress [4].
For finer-grained durability, split the LangGraph into multiple activities — one per graph node — and orchestrate them from the Temporal workflow. This costs more in event history but gives you per-node checkpointing.
[4] Temporal, “LangGraph Integration” — docs.temporal.io/develop/python/integrations/langgraph
Step 6: Human-in-the-Loop with Signals
Durable execution enables proper human-in-the-loop patterns. When the agent needs approval, it pauses and waits for a signal — surviving a worker crash while waiting:
import asyncio
from temporalio import workflow
@workflow.defn
class ApprovalGateWorkflow:
def __init__(self):
self._approved = False
self._decision = None
@workflow.signal
def approve(self, decision: str):
"""Signal handler — called externally to approve/reject."""
self._approved = True
self._decision = decision
@workflow.query
def status(self) -> dict:
"""Query handler — check status without modifying state."""
return {"approved": self._approved, "decision": self._decision}
@workflow.run
async def run(self, query: str) -> str:
# Phase 1: Research
from activities import search_web, call_llm
results = await workflow.execute_activity(
search_web, query,
start_to_close_timeout=timedelta(seconds=20),
)
synthesis = await workflow.execute_activity(
call_llm, [f"Sources: {results}\n\nQuery: {query}", "Research assistant."],
start_to_close_timeout=timedelta(seconds=60),
)
# Phase 2: Wait for human approval — survives crashes
workflow.logger.info("Waiting for human approval...")
await workflow.wait_condition(
lambda: self._approved,
timeout=timedelta(hours=24),
)
if self._decision == "reject":
return "Research rejected by human reviewer."
# Phase 3: Generate report (only after approval)
from activities import generate_report
report = await workflow.execute_activity(
generate_report, synthesis,
start_to_close_timeout=timedelta(seconds=10),
)
return report
Send the approval signal from your UI:
# signal_client.py
from temporalio.client import Client
async def send_approval():
client = await Client.connect("localhost:7233")
handle = client.get_workflow_handle("research-001")
await handle.signal(ApprovalGateWorkflow.approve, "approve")
status = await handle.query(ApprovalGateWorkflow.status)
print(f"Status: {status}")
The key insight: workflow.wait_condition() blocks without consuming worker resources. Temporal persists the intent to wait, and even if the worker restarts, it continues waiting once a new worker picks up the task. The 24-hour timeout ensures the workflow doesn’t hang forever if the human never responds.
Alternative: DBOS for Postgres-Native Durability
If you’re already running Postgres and prefer a simpler setup, DBOS offers durable execution without a separate server process [5]:
from dbos import DBOS, DBOSConfiguredInstance
from dataclasses import dataclass
@DBOS.dbos_class()
class DurableAgent:
@DBOS.workflow()
def research_and_report(self, query: str) -> str:
results = self.search_web(query)
synthesis = self.call_llm(f"Synthesize: {results}")
return self.format_report(synthesis)
@DBOS.step()
def search_web(self, query: str) -> list[dict]:
import httpx
return httpx.get("https://api.example.com/search", params={"q": query}).json()
@DBOS.step()
def call_llm(self, prompt: str) -> str:
# LLM call — automatically cached on replay
...
@DBOS.step()
def format_report(self, content: str) -> str:
return f"# Report\n\n{content}"
DBOS stores the workflow event log directly in your Postgres database. The tradeoff: Postgres can become a bottleneck at high throughput, while Temporal’s purpose-built event store handles millions of workflows [5].
| Feature | Temporal | DBOS |
|---|---|---|
| Durability model | Purpose-built event store | Postgres native |
| Setup complexity | Dedicated server required | pip install + existing Postgres |
| Max throughput | Millions of workflows | Tens of thousands (Postgres-bound) |
| SDK maturity | Python SDK v1.x (stable) | Python SDK v0.x (active development) |
| LangGraph integration | Official | Community |
| Self-hosted option | Yes (Docker Compose) | Yes (single process) |
Choose Temporal for production-scale multi-agent systems. Choose DBOS for lightweight deployments where you already have Postgres and don’t want to run another service.
[5] DBOS, “Durable Workflow Quickstart” — docs.dbos.dev/ai/ai-quickstart
Observability: Tracing Durable Agent Runs
Every Temporal workflow produces a detailed event history — you can inspect it via the Temporal Web UI (http://localhost:8233) or programmatically:
# Trace a workflow's execution
handle = client.get_workflow_handle("research-001")
history = await handle.fetch_history()
for event in history.events:
if event.activity_task_scheduled_event_attributes:
print(f"Scheduled: {event.activity_task_scheduled_event_attributes.activity_type.name}")
if event.activity_task_completed_event_attributes:
print(f"Completed: {event.activity_task_completed_event_attributes.result}")
For OpenTelemetry integration, Temporal’s Python SDK emits traces that follow the GenAI semantic conventions [1]:
from temporalio import workflow
from temporalio.contrib.opentelemetry import configure_tracer_provider
# Your Temporal worker auto-instruments workflow executions
# Each activity becomes a span with gen_ai.* attributes
Pair this with Langfuse or Arize Phoenix for trace visualization across both Temporal and LLM calls.
Key Takeaways
-
Durable execution is table stakes for production agents — without it, any infrastructure failure silently destroys agent progress.
-
Temporal’s workflow-activity boundary maps naturally to AI agents: workflows are the deterministic orchestration plan, activities are the LLM calls and tool invocations.
-
Wrap existing frameworks — LangGraph, OpenAI Agents SDK, or custom agents can all be wrapped in a Temporal activity without rewriting the underlying logic.
-
Human-in-the-loop is free with Temporal signals —
workflow.wait_condition()pauses without consuming resources and survives worker restarts. -
DBOS is viable for Postgres-native deployments but Temporal leads on throughput, SDK maturity, and ecosystem integrations.
-
Observability is built-in — every workflow’s event history is a ready-made trace for debugging and evaluation.
Further Reading
- Temporal AI Cookbook
- DBOS AI Quickstart
- OpenTelemetry GenAI Semantic Conventions
- LangGraph Temporal Integration
- Building Production-Ready AI Agents in 2026 (MLflow)
📖 Related Reads
- CodeIntel Log — code quality, debugging, and software engineering benchmarks
- NoCode Insider — AI workflow automation with no-code tools, agents, and APIs
Cross-links automatically generated from NiteAgent.
← Back to all posts

