Securing LLM API Endpoints in Production
Protecting your LLM APIs from abuse, token exhaustion, and unauthorized access with practical rate limiting and authentication patterns.
Your LLM API is one of the most expensive endpoints in your infrastructure. A single malicious user can rack up thousands of dollars in API costs, exhaust your rate limits, or abuse your service for spam generation. Securing these endpoints isn't optional—it's essential for survival.
Why LLM APIs Are Different
Traditional API security focuses on preventing unauthorized access and data breaches. LLM APIs have those concerns plus unique challenges:
- Cost amplification: A single request can cost $0.10 or more in token processing
- Compute intensity: Each request consumes significant GPU resources
- Output unpredictability: Responses vary wildly in length and cost
- Abuse potential: Your LLM can be weaponized for spam, phishing, or harmful content generation
# Cost comparison: Traditional API vs LLM API
traditional_api_cost = 0.0001 # $0.0001 per request
llm_api_cost = 0.10 # $0.10 per request (1000x more expensive)
# 10,000 malicious requests
traditional_damage = 10000 * traditional_api_cost # $1
llm_damage = 10000 * llm_api_cost # $1,000Attack Vectors
Token Exhaustion
Attackers craft prompts designed to maximize token consumption:
# Malicious prompt designed to generate maximum output
exhaustion_prompt = """
Write a comprehensive 10,000 word essay covering every aspect of quantum physics,
including detailed mathematical proofs, historical context, and future implications.
Include extensive code examples in Python, Java, and Rust.
"""
# Or using the model against itself
recursive_prompt = """
Repeat the following paragraph 100 times, adding one word each time:
"This is a test of the token exhaustion attack methodology."
"""Prompt Bombing
Flooding the API with expensive requests to drain budgets or cause denial of service:
import asyncio
import aiohttp
async def prompt_bomb(api_url: str, num_requests: int = 1000):
"""Flood an unprotected LLM API with requests."""
expensive_prompt = "Write a detailed 5000 word analysis..."
async with aiohttp.ClientSession() as session:
tasks = [
session.post(api_url, json={"prompt": expensive_prompt})
for _ in range(num_requests)
]
await asyncio.gather(*tasks)
# Without protection: $100+ in API costs in secondsCredential Stuffing
Using stolen API keys or session tokens to abuse the service:
# Attackers test leaked credentials against your API
leaked_keys = ["sk-abc123...", "sk-def456...", "sk-ghi789..."]
for key in leaked_keys:
response = requests.post(
"https://your-llm-api.com/generate",
headers={"Authorization": f"Bearer {key}"},
json={"prompt": "Test"}
)
if response.status_code == 200:
print(f"Valid key found: {key}")Free Tier Abuse
Creating multiple accounts to bypass usage limits:
# Automating account creation to bypass per-user limits
for i in range(100):
email = f"throwaway{i}@tempmail.com"
create_account(email)
api_key = get_api_key(email)
# Now attacker has 100x the free tier limitRate Limiting Strategies
Token-Based Rate Limiting
Limit by tokens consumed, not just request count:
from datetime import datetime, timedelta
from collections import defaultdict
class TokenRateLimiter:
"""Rate limit based on tokens consumed, not request count."""
def __init__(self, tokens_per_hour: int = 100000):
self.tokens_per_hour = tokens_per_hour
self.usage = defaultdict(list)
def check_limit(self, user_id: str, estimated_tokens: int) -> dict:
"""Check if user can make request with estimated token count."""
self._cleanup_old_usage(user_id)
current_usage = sum(t['tokens'] for t in self.usage[user_id])
if current_usage + estimated_tokens > self.tokens_per_hour:
return {
'allowed': False,
'current_usage': current_usage,
'limit': self.tokens_per_hour,
'retry_after': self._get_retry_after(user_id)
}
return {'allowed': True, 'current_usage': current_usage}
def record_usage(self, user_id: str, tokens_used: int):
"""Record actual token usage after request completes."""
self.usage[user_id].append({
'tokens': tokens_used,
'timestamp': datetime.now()
})
def _cleanup_old_usage(self, user_id: str):
"""Remove usage records older than 1 hour."""
cutoff = datetime.now() - timedelta(hours=1)
self.usage[user_id] = [
u for u in self.usage[user_id]
if u['timestamp'] > cutoff
]
def _get_retry_after(self, user_id: str) -> int:
"""Calculate seconds until oldest usage record expires."""
if not self.usage[user_id]:
return 0
oldest = min(u['timestamp'] for u in self.usage[user_id])
expires = oldest + timedelta(hours=1)
return max(0, int((expires - datetime.now()).total_seconds()))Cost-Based Rate Limiting
Limit based on actual dollar cost:
class CostRateLimiter:
"""Rate limit based on API cost, not tokens or requests."""
# Pricing per 1K tokens (example rates)
PRICING = {
'gpt-4': {'input': 0.03, 'output': 0.06},
'gpt-3.5-turbo': {'input': 0.0015, 'output': 0.002},
'claude-3-opus': {'input': 0.015, 'output': 0.075},
}
def __init__(self, daily_budget: float = 10.0):
self.daily_budget = daily_budget
self.daily_spend = defaultdict(float)
self.last_reset = defaultdict(datetime.now)
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost for a request."""
pricing = self.PRICING.get(model, self.PRICING['gpt-3.5-turbo'])
input_cost = (input_tokens / 1000) * pricing['input']
output_cost = (output_tokens / 1000) * pricing['output']
return input_cost + output_cost
def check_budget(self, user_id: str, estimated_cost: float) -> dict:
"""Check if user has remaining budget."""
self._reset_if_new_day(user_id)
remaining = self.daily_budget - self.daily_spend[user_id]
if estimated_cost > remaining:
return {
'allowed': False,
'spent': self.daily_spend[user_id],
'budget': self.daily_budget,
'remaining': remaining
}
return {'allowed': True, 'remaining': remaining}
def record_spend(self, user_id: str, cost: float):
"""Record actual cost after request."""
self.daily_spend[user_id] += cost
def _reset_if_new_day(self, user_id: str):
"""Reset budget at midnight."""
if datetime.now().date() > self.last_reset[user_id].date():
self.daily_spend[user_id] = 0
self.last_reset[user_id] = datetime.now()Tiered Rate Limits
Different limits for different user tiers:
from enum import Enum
class UserTier(Enum):
FREE = "free"
PRO = "pro"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {
'requests_per_minute': 10,
'tokens_per_day': 10000,
'max_output_tokens': 500,
'daily_budget': 1.0,
},
UserTier.PRO: {
'requests_per_minute': 60,
'tokens_per_day': 100000,
'max_output_tokens': 4000,
'daily_budget': 50.0,
},
UserTier.ENTERPRISE: {
'requests_per_minute': 300,
'tokens_per_day': 1000000,
'max_output_tokens': 8000,
'daily_budget': 500.0,
},
}
def get_user_limits(user_id: str) -> dict:
"""Get rate limits for user based on their tier."""
tier = get_user_tier(user_id) # From your user database
return TIER_LIMITS[tier]Input Validation
Prompt Size Limits
Reject oversized inputs before they reach the model:
def validate_prompt(prompt: str, max_chars: int = 10000) -> dict:
"""Validate prompt before processing."""
errors = []
# Length check
if len(prompt) > max_chars:
errors.append(f"Prompt exceeds maximum length of {max_chars} characters")
# Estimated token check (rough: 4 chars per token)
estimated_tokens = len(prompt) // 4
if estimated_tokens > 4000:
errors.append("Prompt likely exceeds token limit")
# Empty check
if not prompt.strip():
errors.append("Prompt cannot be empty")
return {
'valid': len(errors) == 0,
'errors': errors,
'estimated_tokens': estimated_tokens
}Output Length Limits
Constrain maximum response length:
def generate_with_limits(prompt: str, user_tier: UserTier) -> str:
"""Generate response with output length limits."""
limits = TIER_LIMITS[user_tier]
response = llm.generate(
prompt=prompt,
max_tokens=limits['max_output_tokens'],
# Stop generation at natural endpoints
stop_sequences=["\n\n---", "THE END", "[END]"]
)
# Truncate if somehow exceeded (defensive)
if len(response) > limits['max_output_tokens'] * 5: # ~5 chars per token
response = response[:limits['max_output_tokens'] * 5] + "... [truncated]"
return responseAuthentication Patterns
API Key Rotation
Force regular key rotation to limit exposure:
from datetime import datetime, timedelta
class APIKeyManager:
"""Manage API keys with automatic rotation."""
def __init__(self, max_key_age_days: int = 90):
self.max_key_age = timedelta(days=max_key_age_days)
def validate_key(self, api_key: str) -> dict:
"""Validate API key and check age."""
key_data = self.get_key_data(api_key)
if not key_data:
return {'valid': False, 'error': 'Invalid API key'}
if key_data['revoked']:
return {'valid': False, 'error': 'API key has been revoked'}
age = datetime.now() - key_data['created_at']
if age > self.max_key_age:
return {
'valid': False,
'error': 'API key expired. Please generate a new key.'
}
# Warn if key is approaching expiration
days_remaining = (self.max_key_age - age).days
warning = None
if days_remaining < 14:
warning = f"API key expires in {days_remaining} days"
return {
'valid': True,
'user_id': key_data['user_id'],
'tier': key_data['tier'],
'warning': warning
}Request Signing
Prevent replay attacks with signed requests:
import hmac
import hashlib
import time
def sign_request(api_key: str, api_secret: str, payload: dict) -> dict:
"""Sign API request to prevent tampering and replay."""
timestamp = str(int(time.time()))
payload_str = json.dumps(payload, sort_keys=True)
# Create signature
message = f"{timestamp}:{payload_str}"
signature = hmac.new(
api_secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return {
'X-API-Key': api_key,
'X-Timestamp': timestamp,
'X-Signature': signature
}
def verify_request(headers: dict, payload: dict, max_age: int = 300) -> bool:
"""Verify request signature and timestamp."""
timestamp = int(headers.get('X-Timestamp', 0))
signature = headers.get('X-Signature', '')
api_key = headers.get('X-API-Key', '')
# Check timestamp freshness (prevent replay)
if abs(time.time() - timestamp) > max_age:
return False
# Verify signature
api_secret = get_secret_for_key(api_key)
payload_str = json.dumps(payload, sort_keys=True)
expected = hmac.new(
api_secret.encode(),
f"{timestamp}:{payload_str}".encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)Monitoring and Alerting
Anomaly Detection
Detect unusual usage patterns:
class UsageAnomalyDetector:
"""Detect anomalous API usage patterns."""
def __init__(self):
self.baseline = {} # user_id -> average usage
def check_anomaly(self, user_id: str, current_usage: dict) -> dict:
"""Check if current usage is anomalous."""
baseline = self.baseline.get(user_id, {
'avg_requests_per_hour': 10,
'avg_tokens_per_request': 500,
'avg_cost_per_day': 1.0
})
anomalies = []
# Request spike
if current_usage['requests_per_hour'] > baseline['avg_requests_per_hour'] * 5:
anomalies.append({
'type': 'request_spike',
'severity': 'high',
'current': current_usage['requests_per_hour'],
'baseline': baseline['avg_requests_per_hour']
})
# Token usage spike
if current_usage['avg_tokens'] > baseline['avg_tokens_per_request'] * 3:
anomalies.append({
'type': 'token_spike',
'severity': 'medium',
'current': current_usage['avg_tokens'],
'baseline': baseline['avg_tokens_per_request']
})
return {
'is_anomalous': len(anomalies) > 0,
'anomalies': anomalies,
'action': 'block' if any(a['severity'] == 'high' for a in anomalies) else 'alert'
}Cost Alerts
Get notified before costs spiral:
def check_cost_alerts(user_id: str, current_spend: float) -> list[dict]:
"""Check if any cost thresholds have been crossed."""
alerts = []
budget = get_user_budget(user_id)
thresholds = [
(0.5, 'warning', '50% of daily budget consumed'),
(0.8, 'critical', '80% of daily budget consumed'),
(1.0, 'limit', 'Daily budget exhausted'),
]
for threshold, severity, message in thresholds:
if current_spend >= budget * threshold:
alerts.append({
'severity': severity,
'message': message,
'spend': current_spend,
'budget': budget
})
return alertsConclusion
Securing LLM APIs requires thinking beyond traditional API security. The key principles:
- Rate limit by cost, not just requests - A single expensive request can do more damage than a thousand cheap ones
- Validate inputs aggressively - Reject oversized, malformed, or suspicious prompts before they reach the model
- Implement tiered access - Not all users need the same limits; match resources to trust level
- Monitor for anomalies - Unusual patterns often indicate abuse or compromised credentials
The organizations that treat LLM API security as a first-class concern will avoid the costly surprises that come from unprotected endpoints. In a world where a single API call can cost dollars, security isn't just about data—it's about survival.