Building Durable AI Agents with Temporal — Crash-Proof Long-Running Workflows

The bottom line: Every AI agent built today will crash. Network timeouts, API rate limits, pod restarts, and deployment rollouts all terminate in-flight agent runs silently — and an agent that loses state mid-task isn’t production-ready. Temporal’s durable execution model gives your agent the same guarantee as a database transaction: it either completes fully, or replays from the last checkpoint automatically. This guide walks through five practical patterns with working code.


Why Durable Execution Matters for AI Agents

A typical agent workflow involves multiple LLM calls, tool invocations, and state transitions over seconds to minutes. Here’s what happens when any of those steps fails without durability:

Failure event Without durability With durable execution
LLM API timeout (10% of calls [1]) Agent loses partial progress, user retries Worker restarts from last activity
Pod killed during scale-down Entire agent state deleted Workflow resumes on new worker
Code deploy mid-agent-run Process terminated, state lost New code picks up at checkpoint
Database connection drop Tool call fails, no retry Activity retries with backoff
30-minute research task restart Must redo all LLM calls Replays from cached results

Durable execution achieves this through event sourcing: every workflow step writes its result to a history store before proceeding. When a crash occurs, the worker reads the event log and replays deterministic computation — LLM calls are cached from the first execution, so you don’t pay for them again [2].

[1] Temporal, “Durable Execution for AI Agents” — temporal.io/ai [2] OpenTelemetry GenAI Semantic Conventions, “Agent Tracing with Durable Execution” — opentelemetry.io/docs/specs/semconv/gen-ai/

What You’re Building

A multi-step research agent that:

  1. Receives a research query and retrieves web sources
  2. Calls an LLM to synthesize findings
  3. Generates a structured report
  4. Sends the result back to the caller

The agent runs inside a Temporal workflow, which means it survives any infrastructure failure between steps. If the worker crashes during LLM call #2, it resumes from exactly that point once a new worker picks up the task.

The full code is available in patterns you can adapt to any LangGraph, OpenAI Agents SDK, or custom agent implementation.


Prerequisites

  • Python 3.10+
  • Docker (for local Temporal Server), or a Temporal Cloud account
  • A Temporal Server running locally: temporal server start-dev
  • OpenAI API key (or any LLM provider supported by your agent framework)
  • Temporal Python SDK: pip install temporalio

Step 1: Setting Up Temporal

Start a local Temporal Server for development:

temporal server start-dev \
  --db-filename /tmp/temporal.db \
  --namespace default

This runs an in-memory Temporal Server with SQLite persistence. For production, you’d run Temporal Server in Docker or use Temporal Cloud.

Verify it’s running:

temporal operator namespace list
# Expected: default (active)

Install the Python SDK:

pip install temporalio

For LangGraph integration, you’ll also need:

pip install langgraph langchain-openai

Step 2: Defining the Workflow-Activity Boundary

The fundamental pattern in Temporal is Workflows call Activities. Workflows are deterministic — they can’t make network calls directly. Activities are non-deterministic — they’re where LLM calls, API requests, and I/O live.

# activities.py — Non-deterministic operations
from temporalio import activity

@activity.defn
async def search_web(query: str) -> list[dict]:
    """Activity: make an HTTP call. Safe to retry."""
    import httpx
    async with httpx.AsyncClient() as client:
        resp = await client.get(
            "https://api.duckduckgo.com",
            params={"q": query, "format": "json"},
            timeout=15,
        )
        resp.raise_for_status()
        return resp.json().get("results", [])

@activity.defn
async def call_llm(prompt: str, system: str) -> str:
    """Activity: LLM call. Retries on timeout, cached on replay."""
    from openai import AsyncOpenAI
    client = AsyncOpenAI()
    resp = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": prompt},
        ],
        timeout=30,
    )
    return resp.choices[0].message.content or ""

@activity.defn
async def generate_report(synthesis: str, format: str = "markdown") -> str:
    """Activity: format the final output."""
    return f"# Research Report\n\n{synthesis}\n\n---\n*Generated by durable agent*"

Key rule: Activities can be retried independently. Temporal replays the workflow up to the last completed activity, then re-executes only the failed one [3].


Step 3: Writing the Durable Workflow

The workflow is the orchestration logic — it’s deterministic and must produce the same output given the same inputs, so Temporal can replay it safely.

# workflows.py — Deterministic orchestration
from datetime import timedelta
from temporalio import workflow

# Import activities — workflow can't import them directly at runtime
# (they're injected by the worker)
with workflow.unsafe.imports_passed_through():
    from activities import search_web, call_llm, generate_report

@workflow.defn
class ResearchAgentWorkflow:
    @workflow.run
    async def run(self, query: str) -> str:
        # Step 1: Search — this is an Activity (network call)
        results = await workflow.execute_activity(
            search_web,
            query,
            start_to_close_timeout=timedelta(seconds=20),
            retry_policy=workflow.RetryPolicy(
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(seconds=10),
                maximum_attempts=3,
            ),
        )

        # Step 2: Synthesize — another Activity (LLM call)
        synthesis = await workflow.execute_activity(
            call_llm,
            args=[
                f"Sources: {results}\n\nSummarize findings for: {query}",
                "You are a research assistant. Synthesize sources into concise findings.",
            ],
            start_to_close_timeout=timedelta(seconds=60),
            retry_policy=workflow.RetryPolicy(
                initial_interval=timedelta(seconds=2),
                maximum_attempts=5,
            ),
        )

        # Step 3: Generate report — final Activity
        report = await workflow.execute_activity(
            generate_report,
            synthesis,
            start_to_close_timeout=timedelta(seconds=10),
        )

        return report

Notice each execute_activity call has explicit timeouts and retry policies. Temporal won’t mark the activity as failed until the timeout fires, and will automatically retry according to the policy [3].

[3] Temporal Python SDK Docs, “Activity Execution” — docs.temporal.io/develop/python/activity-execution


Step 4: Running the Worker

The worker connects to Temporal Server and polls for workflow tasks:

# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows import ResearchAgentWorkflow
from activities import search_web, call_llm, generate_report

async def main():
    client = await Client.connect("localhost:7233", namespace="default")

    worker = Worker(
        client,
        task_queue="research-agent-queue",
        workflows=[ResearchAgentWorkflow],
        activities=[search_web, call_llm, generate_report],
    )

    print("Worker started — waiting for workflow tasks...")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Start the worker in one terminal:

python worker.py

In another terminal, start a workflow:

# client.py
import asyncio
from temporalio.client import Client
from workflows import ResearchAgentWorkflow

async def main():
    client = await Client.connect("localhost:7233", namespace="default")

    handle = await client.start_workflow(
        ResearchAgentWorkflow.run,
        "LLM agent orchestration patterns in production 2026",
        id="research-001",
        task_queue="research-agent-queue",
    )

    result = await handle.result()
    print(f"Report:\n{result}")

if __name__ == "__main__":
    asyncio.run(main())
python client.py

Test durability: Kill the worker process mid-execution, then restart it. The workflow resumes from the last completed activity — no data loss, no duplicate LLM calls.


Step 5: Wrapping LangGraph in a Temporal Activity

If you already have a LangGraph agent, you don’t need to rewrite it. Wrap the entire graph execution in a single Temporal activity:

# langgraph_activity.py
from temporalio import activity
from langgraph.graph import StateGraph, MessagesState
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver

@activity.defn
async def run_langgraph_agent(query: str) -> dict:
    """Wraps an entire LangGraph agent run as a single Temporal activity."""
    model = ChatOpenAI(model="gpt-4o", temperature=0)

    # Build a simple graph
    workflow = StateGraph(MessagesState)

    async def call_model(state: MessagesState):
        response = await model.ainvoke(state["messages"])
        return {"messages": [response]}

    workflow.add_node("agent", call_model)
    workflow.set_entry_point("agent")
    graph = workflow.compile(checkpointer=MemorySaver())

    # Run with config
    config = {"configurable": {"thread_id": f"langgraph-{activity.info().activity_id}"}}
    result = await graph.ainvoke(
        {"messages": [("user", query)]},
        config,
    )
    return {"result": result["messages"][-1].content}

This pattern gives you Temporal’s durability for free — the LangGraph activity becomes a single atomic step in the larger workflow. If Temporal crashes mid-graph, the entire activity retries from scratch, but Temporal’s caching ensures you don’t lose the workflow’s overall progress [4].

For finer-grained durability, split the LangGraph into multiple activities — one per graph node — and orchestrate them from the Temporal workflow. This costs more in event history but gives you per-node checkpointing.

[4] Temporal, “LangGraph Integration” — docs.temporal.io/develop/python/integrations/langgraph


Step 6: Human-in-the-Loop with Signals

Durable execution enables proper human-in-the-loop patterns. When the agent needs approval, it pauses and waits for a signal — surviving a worker crash while waiting:

import asyncio
from temporalio import workflow

@workflow.defn
class ApprovalGateWorkflow:
    def __init__(self):
        self._approved = False
        self._decision = None

    @workflow.signal
    def approve(self, decision: str):
        """Signal handler — called externally to approve/reject."""
        self._approved = True
        self._decision = decision

    @workflow.query
    def status(self) -> dict:
        """Query handler — check status without modifying state."""
        return {"approved": self._approved, "decision": self._decision}

    @workflow.run
    async def run(self, query: str) -> str:
        # Phase 1: Research
        from activities import search_web, call_llm
        results = await workflow.execute_activity(
            search_web, query,
            start_to_close_timeout=timedelta(seconds=20),
        )
        synthesis = await workflow.execute_activity(
            call_llm, [f"Sources: {results}\n\nQuery: {query}", "Research assistant."],
            start_to_close_timeout=timedelta(seconds=60),
        )

        # Phase 2: Wait for human approval — survives crashes
        workflow.logger.info("Waiting for human approval...")
        await workflow.wait_condition(
            lambda: self._approved,
            timeout=timedelta(hours=24),
        )

        if self._decision == "reject":
            return "Research rejected by human reviewer."

        # Phase 3: Generate report (only after approval)
        from activities import generate_report
        report = await workflow.execute_activity(
            generate_report, synthesis,
            start_to_close_timeout=timedelta(seconds=10),
        )
        return report

Send the approval signal from your UI:

# signal_client.py
from temporalio.client import Client

async def send_approval():
    client = await Client.connect("localhost:7233")
    handle = client.get_workflow_handle("research-001")
    await handle.signal(ApprovalGateWorkflow.approve, "approve")
    status = await handle.query(ApprovalGateWorkflow.status)
    print(f"Status: {status}")

The key insight: workflow.wait_condition() blocks without consuming worker resources. Temporal persists the intent to wait, and even if the worker restarts, it continues waiting once a new worker picks up the task. The 24-hour timeout ensures the workflow doesn’t hang forever if the human never responds.


Alternative: DBOS for Postgres-Native Durability

If you’re already running Postgres and prefer a simpler setup, DBOS offers durable execution without a separate server process [5]:

from dbos import DBOS, DBOSConfiguredInstance
from dataclasses import dataclass

@DBOS.dbos_class()
class DurableAgent:
    @DBOS.workflow()
    def research_and_report(self, query: str) -> str:
        results = self.search_web(query)
        synthesis = self.call_llm(f"Synthesize: {results}")
        return self.format_report(synthesis)

    @DBOS.step()
    def search_web(self, query: str) -> list[dict]:
        import httpx
        return httpx.get("https://api.example.com/search", params={"q": query}).json()

    @DBOS.step()
    def call_llm(self, prompt: str) -> str:
        # LLM call — automatically cached on replay
        ...

    @DBOS.step()
    def format_report(self, content: str) -> str:
        return f"# Report\n\n{content}"

DBOS stores the workflow event log directly in your Postgres database. The tradeoff: Postgres can become a bottleneck at high throughput, while Temporal’s purpose-built event store handles millions of workflows [5].

Feature Temporal DBOS
Durability model Purpose-built event store Postgres native
Setup complexity Dedicated server required pip install + existing Postgres
Max throughput Millions of workflows Tens of thousands (Postgres-bound)
SDK maturity Python SDK v1.x (stable) Python SDK v0.x (active development)
LangGraph integration Official Community
Self-hosted option Yes (Docker Compose) Yes (single process)

Choose Temporal for production-scale multi-agent systems. Choose DBOS for lightweight deployments where you already have Postgres and don’t want to run another service.

[5] DBOS, “Durable Workflow Quickstart” — docs.dbos.dev/ai/ai-quickstart


Observability: Tracing Durable Agent Runs

Every Temporal workflow produces a detailed event history — you can inspect it via the Temporal Web UI (http://localhost:8233) or programmatically:

# Trace a workflow's execution
handle = client.get_workflow_handle("research-001")
history = await handle.fetch_history()

for event in history.events:
    if event.activity_task_scheduled_event_attributes:
        print(f"Scheduled: {event.activity_task_scheduled_event_attributes.activity_type.name}")
    if event.activity_task_completed_event_attributes:
        print(f"Completed: {event.activity_task_completed_event_attributes.result}")

For OpenTelemetry integration, Temporal’s Python SDK emits traces that follow the GenAI semantic conventions [1]:

from temporalio import workflow
from temporalio.contrib.opentelemetry import configure_tracer_provider

# Your Temporal worker auto-instruments workflow executions
# Each activity becomes a span with gen_ai.* attributes

Pair this with Langfuse or Arize Phoenix for trace visualization across both Temporal and LLM calls.


Key Takeaways

  1. Durable execution is table stakes for production agents — without it, any infrastructure failure silently destroys agent progress.

  2. Temporal’s workflow-activity boundary maps naturally to AI agents: workflows are the deterministic orchestration plan, activities are the LLM calls and tool invocations.

  3. Wrap existing frameworks — LangGraph, OpenAI Agents SDK, or custom agents can all be wrapped in a Temporal activity without rewriting the underlying logic.

  4. Human-in-the-loop is free with Temporal signals — workflow.wait_condition() pauses without consuming resources and survives worker restarts.

  5. DBOS is viable for Postgres-native deployments but Temporal leads on throughput, SDK maturity, and ecosystem integrations.

  6. Observability is built-in — every workflow’s event history is a ready-made trace for debugging and evaluation.

Further Reading

  • CodeIntel Log — code quality, debugging, and software engineering benchmarks
  • NoCode Insider — AI workflow automation with no-code tools, agents, and APIs

Cross-links automatically generated from NiteAgent.

← Back to all posts