Managing Rate Limits and Token Budgets in Production AI Agents

Rate limits are the first thing that breaks when you take an agent from a notebook to production. A single agent hitting an LLM API at 60 RPM per key is fine — 20 agents sharing one key hit 429s in seconds. Token budgets blow past projected spend because no one accounted for retries, tool call overhead, or chain-of-thought expansion.
This guide covers the patterns for rate-limiting, quota-managing, and budget-enforcing agent systems that run unattended. You’ll build a token-aware rate limiter, a cost tracker with per-call attribution, and a circuit breaker that routes around exhausted providers.
Prerequisites
- Python 3.11+ with
asyncio(agents typically run concurrently) pip install aiohttpfor async HTTP calls- One or more LLM API keys (OpenAI, Anthropic, or any OpenAI-compatible provider)
- A Redis instance (optional but recommended for distributed rate limiting)
Step 1: Token-Aware Token Bucket Rate Limiter
Most rate limiters count requests per minute [1]. That’s insufficient for agents because one request might consume 200 tokens (a simple classification) and another 32,000 (a full conversation replay). A token-aware rate limiter tracks both request count and token consumption [2].
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TokenBucket:
"""Token-bucket rate limiter that tracks both requests and tokens."""
rpm_limit: int # Max requests per minute
tpm_limit: int # Max tokens per minute
window: float = 60.0 # Sliding window in seconds
_requests: deque = field(default_factory=lambda: deque())
_tokens: deque = field(default_factory=lambda: deque())
async def acquire(self, estimated_tokens: int = 0) -> float:
"""Block until a slot is available. Returns wait time in seconds."""
now = time.monotonic()
cutoff = now - self.window
# Purge expired entries
while self._requests and self._requests[0] < cutoff:
self._requests.popleft()
self._tokens.popleft()
# Check if we'd exceed limits
if len(self._requests) >= self.rpm_limit:
wait = self._requests[0] + self.window - now
await asyncio.sleep(wait)
return await self.acquire(estimated_tokens)
total_tokens = sum(self._tokens) + estimated_tokens
if total_tokens > self.tpm_limit:
wait = cutoff + self.window - now
await asyncio.sleep(max(wait, 0.5))
return await self.acquire(estimated_tokens)
self._requests.append(now)
self._tokens.append(estimated_tokens)
return 0.0
The key difference from a simple RPM limiter: we estimate token consumption before sending the request and block if the sum would exceed the per-minute token quota. For streaming responses, estimate conservatively using the max_output_tokens parameter.
Step 2: Per-Call Cost Tracking with Attribution
Without attribution you can’t answer “which agent workflow is burning budget.” This tracker wraps any OpenAI-compatible API and records cost per call with a workflow identifier.
import json
import time
from dataclasses import dataclass, asdict
# Cost per 1K tokens (pricing as of mid-2026; adjust for your provider) [3]
MODEL_PRICING = {
"gpt-4o": {"input": 0.0025, "output": 0.010},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"claude-3.5-sonnet": {"input": 0.003, "output": 0.015},
"claude-3-haiku": {"input": 0.00025, "output": 0.00125},
}
@dataclass
class CostRecord:
workflow_id: str
model: str
input_tokens: int
output_tokens: int
cost: float
timestamp: float = 0.0
def __post_init__(self):
self.timestamp = time.time()
class CostTracker:
"""Thread-safe per-workflow cost accumulator."""
def __init__(self, budget_usd: float = 0.0):
self._records: list[CostRecord] = []
self._budget = budget_usd
self._total = 0.0
def record(self, workflow_id: str, model: str,
input_tokens: int, output_tokens: int):
pricing = MODEL_PRICING.get(model, {"input": 0.002, "output": 0.008})
cost = (input_tokens / 1000 * pricing["input"] +
output_tokens / 1000 * pricing["output"])
record = CostRecord(workflow_id, model, input_tokens, output_tokens, cost)
self._records.append(record)
self._total += cost
def check_budget(self) -> bool:
"""Returns True if budget is exhausted."""
if self._budget <= 0:
return False
return self._total >= self._budget
def report(self) -> dict:
return {
"total_cost": round(self._total, 4),
"budget": self._budget,
"exhausted": self.check_budget(),
"by_workflow": self._group_by_workflow(),
}
def _group_by_workflow(self) -> dict:
groups = {}
for r in self._records:
groups.setdefault(r.workflow_id, []).append(asdict(r))
return groups
This tracker lets you attach a workflow_id to every agent run — your nightly batch summarizer, your support ticket responder, your code review agent — and see exactly how much each costs. The check_budget() method gives a hard ceiling: once it returns True, you drop to fallback models or queue until next cycle.
Step 3: Provider Failover with Circuit Breaker
When one provider hits its rate limit or exhausts quota, the agent should fail over — not crash. This circuit breaker wraps multiple providers and routes around degraded ones.
import random
from enum import Enum, auto
class CircuitState(Enum):
CLOSED = auto() # Normal operation
OPEN = auto() # Failing — skip this provider
HALF_OPEN = auto() # Testing recovery
@dataclass
class ProviderEndpoint:
name: str
api_key: str
base_url: str
rpm_limit: int
weight: int = 1 # Higher = more traffic (for cost optimization)
class MultiProviderRouter:
"""Round-robin with circuit breakers and weighted distribution."""
def __init__(self, endpoints: list[ProviderEndpoint]):
self._endpoints = endpoints
self._state: dict[str, CircuitState] = {
e.name: CircuitState.CLOSED for e in endpoints
}
self._failure_count: dict[str, int] = {e.name: 0 for e in endpoints}
self._threshold = 3 # Failures before opening circuit
self._cooldown = 60.0 # Seconds before testing recovery
def pick(self) -> ProviderEndpoint:
"""Pick an endpoint that's not in OPEN state."""
candidates = [
e for e in self._endpoints
if self._state[e.name] != CircuitState.OPEN
]
if not candidates:
raise RuntimeError("All providers are in OPEN state")
# Weighted random selection
weights = [e.weight for e in candidates]
return random.choices(candidates, weights=weights, k=1)[0]
def record_success(self, name: str):
self._state[name] = CircuitState.CLOSED
self._failure_count[name] = 0
def record_failure(self, name: str):
self._failure_count[name] += 1
if self._failure_count[name] >= self._threshold:
self._state[name] = CircuitState.OPEN
# Schedule half-open recovery check
asyncio.get_event_loop().call_later(
self._cooldown,
lambda: setattr(self._state, name, CircuitState.HALF_OPEN)
)
The router uses weighted random selection, so you can route more traffic to cheaper models (higher weight) and less to expensive ones. When a provider opens its circuit, the router skips it entirely and tries the next candidate.
Step 4: Putting It Together — A Production Agent Client
Here’s how the pieces compose into a concrete agent HTTP client:
import aiohttp
import asyncio
class BudgetAwareAgentClient:
"""Agent HTTP client with rate limiting, cost tracking, and failover."""
def __init__(self, bucket: TokenBucket, tracker: CostTracker,
router: MultiProviderRouter):
self._bucket = bucket
self._tracker = tracker
self._router = router
self._session = aiohttp.ClientSession()
async def chat_completion(self, messages: list[dict],
model: str, workflow_id: str,
max_tokens: int = 4096) -> dict:
# Check budget before making the call [4]
if self._tracker.check_budget():
raise BudgetExhaustedError(
f"Budget ${self._tracker.report()['budget']:.2f} exhausted"
)
# Acquire rate limiter slot with token estimate
token_estimate = sum(len(m.get("content", "")) for m in messages) // 4
await self._bucket.acquire(token_estimate + max_tokens)
# Pick the best available provider
endpoint = self._router.pick()
payload = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
}
try:
async with self._session.post(
f"{endpoint.base_url}/chat/completions",
json=payload,
headers={"Authorization": f"Bearer {endpoint.api_key}"},
) as resp:
if resp.status == 429:
self._router.record_failure(endpoint.name)
# Wait for retry-after header or exponential backoff [5]
retry_after = int(resp.headers.get("Retry-After", 5))
await asyncio.sleep(retry_after)
return await self.chat_completion(
messages, model, workflow_id, max_tokens
)
resp.raise_for_status()
data = await resp.json()
self._router.record_success(endpoint.name)
# Track cost
usage = data.get("usage", {})
self._tracker.record(
workflow_id=workflow_id,
model=model,
input_tokens=usage.get("prompt_tokens", 0),
output_tokens=usage.get("completion_tokens", 0),
)
return data
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
self._router.record_failure(endpoint.name)
raise
class BudgetExhaustedError(Exception):
pass
This client checks budget first (fast-fail before spending tokens on the request), then acquires the rate limiter, routes to an available provider, handles 429s with retry-after, and records cost on success. If the provider is down, it raises — your orchestrating agent catches the exception and decides whether to retry, escalate, or queue for later.
Step 5: Queuing Requests for Batch Processing
When all providers are rate-limited, queue requests instead of dropping them. A Redis-backed queue lets multiple agent instances share work without hammering APIs.
import json
import redis.asyncio as aioredis
class AgentRequestQueue:
"""Redis-backed FIFO for delayed agent requests."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self._redis: aioredis.Redis | None = None
self._redis_url = redis_url
async def connect(self):
self._redis = await aioredis.from_url(self._redis_url)
async def enqueue(self, workflow_id: str, payload: dict):
await self._redis.rpush(
f"agent_queue:{workflow_id}",
json.dumps(payload)
)
async def dequeue(self, workflow_id: str,
batch_size: int = 10) -> list[dict]:
items = await self._redis.lrange(
f"agent_queue:{workflow_id}", 0, batch_size - 1
)
if items:
await self._redis.ltrim(
f"agent_queue:{workflow_id}", len(items), -1
)
return [json.loads(item) for item in items]
async def size(self, workflow_id: str) -> int:
return await self._redis.llen(f"agent_queue:{workflow_id}")
Use the queue for non-urgent work — nightly report generation, document indexing, batch classification. Urgent requests (user-facing chat) should use the failover path instead.
Key Takeaways
- Token-aware limiting beats RPM limiting — A single large context request can exhaust your per-minute token quota faster than 100 small ones. Estimate tokens before dispatching.
- Attribute costs by workflow — Without workflow IDs in your cost tracker, you can’t identify which agent workflow is burning most of your budget. Prefix every call with a workflow identifier.
- Circuit breakers prevent cascading failures — When Provider A is rate-limited, routing all traffic to Provider B overloads it too. Implement automatic failover with cooldowns.
- Budget enforcement should be pre-request, not post-hoc — Check budget before making the API call, not after the response arrives. Fast-fail saves token spend on failed requests.
- Queue, don’t drop, non-urgent work — Rate limits are temporary. A Redis-backed queue lets you absorb traffic spikes without losing work items.
Related Tools / Further Reading
- OpenAI Rate Limits Documentation — Official guidance on RPM/TPM tiers [1]
- Token Counting with tiktoken — Accurate token estimation before API calls [2]
- OpenAI Platform Pricing — Current per-model token pricing [3]
- Preventing Cost Overruns with API Budgets — Budget enforcement patterns [4]
- Anthropic API Rate Limits — Usage limits and retry-after guidance [5]
- Circuit Breaker Pattern (Microsoft) — The distributed systems pattern that inspired Step 3
- NiteAgent: Multi-Provider LLM Router/Fallback Guide — Deeper dive on provider routing strategies
- NiteAgent: AI Agent Cost Optimization 2026 — Broader cost-reduction patterns beyond rate limiting
📖 Related Reads
- CodeIntel Log — code quality, debugging, and software engineering benchmarks
Cross-links automatically generated from NiteAgent.
← Back to all posts

