Managing Rate Limits and Token Budgets in Production AI Agents

Rate limits are the first thing that breaks when you take an agent from a notebook to production. A single agent hitting an LLM API at 60 RPM per key is fine — 20 agents sharing one key hit 429s in seconds. Token budgets blow past projected spend because no one accounted for retries, tool call overhead, or chain-of-thought expansion.

This guide covers the patterns for rate-limiting, quota-managing, and budget-enforcing agent systems that run unattended. You’ll build a token-aware rate limiter, a cost tracker with per-call attribution, and a circuit breaker that routes around exhausted providers.

Prerequisites

  • Python 3.11+ with asyncio (agents typically run concurrently)
  • pip install aiohttp for async HTTP calls
  • One or more LLM API keys (OpenAI, Anthropic, or any OpenAI-compatible provider)
  • A Redis instance (optional but recommended for distributed rate limiting)

Step 1: Token-Aware Token Bucket Rate Limiter

Most rate limiters count requests per minute [1]. That’s insufficient for agents because one request might consume 200 tokens (a simple classification) and another 32,000 (a full conversation replay). A token-aware rate limiter tracks both request count and token consumption [2].

import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class TokenBucket:
    """Token-bucket rate limiter that tracks both requests and tokens."""
    rpm_limit: int           # Max requests per minute
    tpm_limit: int           # Max tokens per minute
    window: float = 60.0     # Sliding window in seconds
    _requests: deque = field(default_factory=lambda: deque())
    _tokens: deque = field(default_factory=lambda: deque())

    async def acquire(self, estimated_tokens: int = 0) -> float:
        """Block until a slot is available. Returns wait time in seconds."""
        now = time.monotonic()
        cutoff = now - self.window

        # Purge expired entries
        while self._requests and self._requests[0] < cutoff:
            self._requests.popleft()
            self._tokens.popleft()

        # Check if we'd exceed limits
        if len(self._requests) >= self.rpm_limit:
            wait = self._requests[0] + self.window - now
            await asyncio.sleep(wait)
            return await self.acquire(estimated_tokens)

        total_tokens = sum(self._tokens) + estimated_tokens
        if total_tokens > self.tpm_limit:
            wait = cutoff + self.window - now
            await asyncio.sleep(max(wait, 0.5))
            return await self.acquire(estimated_tokens)

        self._requests.append(now)
        self._tokens.append(estimated_tokens)
        return 0.0

The key difference from a simple RPM limiter: we estimate token consumption before sending the request and block if the sum would exceed the per-minute token quota. For streaming responses, estimate conservatively using the max_output_tokens parameter.

Step 2: Per-Call Cost Tracking with Attribution

Without attribution you can’t answer “which agent workflow is burning budget.” This tracker wraps any OpenAI-compatible API and records cost per call with a workflow identifier.

import json
import time
from dataclasses import dataclass, asdict

# Cost per 1K tokens (pricing as of mid-2026; adjust for your provider) [3]
MODEL_PRICING = {
    "gpt-4o": {"input": 0.0025, "output": 0.010},
    "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
    "claude-3.5-sonnet": {"input": 0.003, "output": 0.015},
    "claude-3-haiku": {"input": 0.00025, "output": 0.00125},
}

@dataclass
class CostRecord:
    workflow_id: str
    model: str
    input_tokens: int
    output_tokens: int
    cost: float
    timestamp: float = 0.0

    def __post_init__(self):
        self.timestamp = time.time()

class CostTracker:
    """Thread-safe per-workflow cost accumulator."""

    def __init__(self, budget_usd: float = 0.0):
        self._records: list[CostRecord] = []
        self._budget = budget_usd
        self._total = 0.0

    def record(self, workflow_id: str, model: str,
               input_tokens: int, output_tokens: int):
        pricing = MODEL_PRICING.get(model, {"input": 0.002, "output": 0.008})
        cost = (input_tokens / 1000 * pricing["input"] +
                output_tokens / 1000 * pricing["output"])
        record = CostRecord(workflow_id, model, input_tokens, output_tokens, cost)
        self._records.append(record)
        self._total += cost

    def check_budget(self) -> bool:
        """Returns True if budget is exhausted."""
        if self._budget <= 0:
            return False
        return self._total >= self._budget

    def report(self) -> dict:
        return {
            "total_cost": round(self._total, 4),
            "budget": self._budget,
            "exhausted": self.check_budget(),
            "by_workflow": self._group_by_workflow(),
        }

    def _group_by_workflow(self) -> dict:
        groups = {}
        for r in self._records:
            groups.setdefault(r.workflow_id, []).append(asdict(r))
        return groups

This tracker lets you attach a workflow_id to every agent run — your nightly batch summarizer, your support ticket responder, your code review agent — and see exactly how much each costs. The check_budget() method gives a hard ceiling: once it returns True, you drop to fallback models or queue until next cycle.

Step 3: Provider Failover with Circuit Breaker

When one provider hits its rate limit or exhausts quota, the agent should fail over — not crash. This circuit breaker wraps multiple providers and routes around degraded ones.

import random
from enum import Enum, auto

class CircuitState(Enum):
    CLOSED = auto()      # Normal operation
    OPEN = auto()        # Failing — skip this provider
    HALF_OPEN = auto()   # Testing recovery

@dataclass
class ProviderEndpoint:
    name: str
    api_key: str
    base_url: str
    rpm_limit: int
    weight: int = 1       # Higher = more traffic (for cost optimization)

class MultiProviderRouter:
    """Round-robin with circuit breakers and weighted distribution."""

    def __init__(self, endpoints: list[ProviderEndpoint]):
        self._endpoints = endpoints
        self._state: dict[str, CircuitState] = {
            e.name: CircuitState.CLOSED for e in endpoints
        }
        self._failure_count: dict[str, int] = {e.name: 0 for e in endpoints}
        self._threshold = 3       # Failures before opening circuit
        self._cooldown = 60.0     # Seconds before testing recovery

    def pick(self) -> ProviderEndpoint:
        """Pick an endpoint that's not in OPEN state."""
        candidates = [
            e for e in self._endpoints
            if self._state[e.name] != CircuitState.OPEN
        ]
        if not candidates:
            raise RuntimeError("All providers are in OPEN state")

        # Weighted random selection
        weights = [e.weight for e in candidates]
        return random.choices(candidates, weights=weights, k=1)[0]

    def record_success(self, name: str):
        self._state[name] = CircuitState.CLOSED
        self._failure_count[name] = 0

    def record_failure(self, name: str):
        self._failure_count[name] += 1
        if self._failure_count[name] >= self._threshold:
            self._state[name] = CircuitState.OPEN
            # Schedule half-open recovery check
            asyncio.get_event_loop().call_later(
                self._cooldown,
                lambda: setattr(self._state, name, CircuitState.HALF_OPEN)
            )

The router uses weighted random selection, so you can route more traffic to cheaper models (higher weight) and less to expensive ones. When a provider opens its circuit, the router skips it entirely and tries the next candidate.

Step 4: Putting It Together — A Production Agent Client

Here’s how the pieces compose into a concrete agent HTTP client:

import aiohttp
import asyncio

class BudgetAwareAgentClient:
    """Agent HTTP client with rate limiting, cost tracking, and failover."""

    def __init__(self, bucket: TokenBucket, tracker: CostTracker,
                 router: MultiProviderRouter):
        self._bucket = bucket
        self._tracker = tracker
        self._router = router
        self._session = aiohttp.ClientSession()

    async def chat_completion(self, messages: list[dict],
                              model: str, workflow_id: str,
                              max_tokens: int = 4096) -> dict:
        # Check budget before making the call [4]
        if self._tracker.check_budget():
            raise BudgetExhaustedError(
                f"Budget ${self._tracker.report()['budget']:.2f} exhausted"
            )

        # Acquire rate limiter slot with token estimate
        token_estimate = sum(len(m.get("content", "")) for m in messages) // 4
        await self._bucket.acquire(token_estimate + max_tokens)

        # Pick the best available provider
        endpoint = self._router.pick()
        payload = {
            "model": model,
            "messages": messages,
            "max_tokens": max_tokens,
        }

        try:
            async with self._session.post(
                f"{endpoint.base_url}/chat/completions",
                json=payload,
                headers={"Authorization": f"Bearer {endpoint.api_key}"},
            ) as resp:
                if resp.status == 429:
                    self._router.record_failure(endpoint.name)
                    # Wait for retry-after header or exponential backoff [5]
                    retry_after = int(resp.headers.get("Retry-After", 5))
                    await asyncio.sleep(retry_after)
                    return await self.chat_completion(
                        messages, model, workflow_id, max_tokens
                    )
                resp.raise_for_status()
                data = await resp.json()
                self._router.record_success(endpoint.name)

                # Track cost
                usage = data.get("usage", {})
                self._tracker.record(
                    workflow_id=workflow_id,
                    model=model,
                    input_tokens=usage.get("prompt_tokens", 0),
                    output_tokens=usage.get("completion_tokens", 0),
                )
                return data

        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            self._router.record_failure(endpoint.name)
            raise

class BudgetExhaustedError(Exception):
    pass

This client checks budget first (fast-fail before spending tokens on the request), then acquires the rate limiter, routes to an available provider, handles 429s with retry-after, and records cost on success. If the provider is down, it raises — your orchestrating agent catches the exception and decides whether to retry, escalate, or queue for later.

Step 5: Queuing Requests for Batch Processing

When all providers are rate-limited, queue requests instead of dropping them. A Redis-backed queue lets multiple agent instances share work without hammering APIs.

import json
import redis.asyncio as aioredis

class AgentRequestQueue:
    """Redis-backed FIFO for delayed agent requests."""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self._redis: aioredis.Redis | None = None
        self._redis_url = redis_url

    async def connect(self):
        self._redis = await aioredis.from_url(self._redis_url)

    async def enqueue(self, workflow_id: str, payload: dict):
        await self._redis.rpush(
            f"agent_queue:{workflow_id}",
            json.dumps(payload)
        )

    async def dequeue(self, workflow_id: str,
                      batch_size: int = 10) -> list[dict]:
        items = await self._redis.lrange(
            f"agent_queue:{workflow_id}", 0, batch_size - 1
        )
        if items:
            await self._redis.ltrim(
                f"agent_queue:{workflow_id}", len(items), -1
            )
        return [json.loads(item) for item in items]

    async def size(self, workflow_id: str) -> int:
        return await self._redis.llen(f"agent_queue:{workflow_id}")

Use the queue for non-urgent work — nightly report generation, document indexing, batch classification. Urgent requests (user-facing chat) should use the failover path instead.

Key Takeaways

  1. Token-aware limiting beats RPM limiting — A single large context request can exhaust your per-minute token quota faster than 100 small ones. Estimate tokens before dispatching.
  2. Attribute costs by workflow — Without workflow IDs in your cost tracker, you can’t identify which agent workflow is burning most of your budget. Prefix every call with a workflow identifier.
  3. Circuit breakers prevent cascading failures — When Provider A is rate-limited, routing all traffic to Provider B overloads it too. Implement automatic failover with cooldowns.
  4. Budget enforcement should be pre-request, not post-hoc — Check budget before making the API call, not after the response arrives. Fast-fail saves token spend on failed requests.
  5. Queue, don’t drop, non-urgent work — Rate limits are temporary. A Redis-backed queue lets you absorb traffic spikes without losing work items.
  • CodeIntel Log — code quality, debugging, and software engineering benchmarks

Cross-links automatically generated from NiteAgent.

← Back to all posts