Publié February 5, 2025
7 min de lecture
Your LLM API is one of the most expensive endpoints in your infrastructure. A single malicious user can rack up thousands of dollars in API costs, exhaust your rate limits, or abuse your service for spam generation. Securing these endpoints isn't optional—it's essential for survival.
Traditional API security focuses on preventing unauthorized access and data breaches. LLM APIs have those concerns plus unique challenges:
# Cost comparison: Traditional API vs LLM API
traditional_api_cost = 0.0001 # $0.0001 per request
llm_api_cost = 0.10 # $0.10 per request (1000x more expensive)
# 10,000 malicious requests
traditional_damage = 10000 * traditional_api_cost # $1
llm_damage = 10000 * llm_api_cost # $1,000Attackers craft prompts designed to maximize token consumption:
# Malicious prompt designed to generate maximum output
exhaustion_prompt = """
Write a comprehensive 10,000 word essay covering every aspect of quantum physics,
including detailed mathematical proofs, historical context, and future implications.
Include extensive code examples in Python, Java, and Rust.
"""
# Or using the model against itself
recursive_prompt = """
Repeat the following paragraph 100 times, adding one word each time:
"This is a test of the token exhaustion attack methodology."
"""Flooding the API with expensive requests to drain budgets or cause denial of service:
import asyncio
import aiohttp
async def prompt_bomb(api_url: str, num_requests: int = 1000):
"""Flood an unprotected LLM API with requests."""
expensive_prompt = "Write a detailed 5000 word analysis..."
async with aiohttp.ClientSession() as session:
tasks = [
session.post(api_url, json={"prompt": expensive_prompt})
for _ in range(num_requests)
]
await asyncio.gather(*tasks)
# Without protection: $100+ in API costs in secondsUsing stolen API keys or session tokens to abuse the service:
# Attackers test leaked credentials against your API
leaked_keys = ["sk-abc123...", "sk-def456...", "sk-ghi789..."]
for key in leaked_keys:
response = requests.post(
"https://your-llm-api.com/generate",
headers={"Authorization": f"Bearer {key}"},
json={"prompt": "Test"}
)
if response.status_code == 200:
print(f"Valid key found: {key}")Creating multiple accounts to bypass usage limits:
# Automating account creation to bypass per-user limits
for i in range(100):
email = f"throwaway{i}@tempmail.com"
create_account(email)
api_key = get_api_key(email)
# Now attacker has 100x the free tier limitLimit by tokens consumed, not just request count:
from datetime import datetime, timedelta
from collections import defaultdict
class TokenRateLimiter:
"""Rate limit based on tokens consumed, not request count."""
def __init__(self, tokens_per_hour: int = 100000):
self.tokens_per_hour = tokens_per_hour
self.usage = defaultdict(list)
def check_limit(self, user_id: str, estimated_tokens: int) -> dict:
"""Check if user can make request with estimated token count."""
self._cleanup_old_usage(user_id)
current_usage = sum(t['tokens'] for t in self.usage[user_id])
if current_usage + estimated_tokens > self.tokens_per_hour:
return {
'allowed': False,
'current_usage': current_usage,
'limit': self.tokens_per_hour,
'retry_after': self._get_retry_after(user_id)
}
return {'allowed': True, 'current_usage': current_usage}
def record_usage(self, user_id: str, tokens_used: int):
"""Record actual token usage after request completes."""
self.usage[user_id].append({
'tokens': tokens_used,
'timestamp': datetime.now()
})
def _cleanup_old_usage(self, user_id: str):
"""Remove usage records older than 1 hour."""
cutoff = datetime.now() - timedelta(hours=1)
self.usage[user_id] = [
u for u in self.usage[user_id]
if u['timestamp'] > cutoff
]
def _get_retry_after(self, user_id: str) -> int:
"""Calculate seconds until oldest usage record expires."""
if not self.usage[user_id]:
return 0
oldest = min(u['timestamp'] for u in self.usage[user_id])
expires = oldest + timedelta(hours=1)
return max(0, int((expires - datetime.now()).total_seconds()))Limit based on actual dollar cost:
class CostRateLimiter:
"""Rate limit based on API cost, not tokens or requests."""
# Pricing per 1K tokens (example rates)
PRICING = {
'gpt-4': {'input': 0.03, 'output': 0.06},
'gpt-3.5-turbo': {'input': 0.0015, 'output': 0.002},
'claude-3-opus': {'input': 0.015, 'output': 0.075},
}
def __init__(self, daily_budget: float = 10.0):
self.daily_budget = daily_budget
self.daily_spend = defaultdict(float)
self.last_reset = defaultdict(datetime.now)
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost for a request."""
pricing = self.PRICING.get(model, self.PRICING['gpt-3.5-turbo'])
input_cost = (input_tokens / 1000) * pricing['input']
output_cost = (output_tokens / 1000) * pricing['output']
return input_cost + output_cost
def check_budget(self, user_id: str, estimated_cost: float) -> dict:
"""Check if user has remaining budget."""
self._reset_if_new_day(user_id)
remaining = self.daily_budget - self.daily_spend[user_id]
if estimated_cost > remaining:
return {
'allowed': False,
'spent': self.daily_spend[user_id],
'budget': self.daily_budget,
'remaining': remaining
}
return {'allowed': True, 'remaining': remaining}
def record_spend(self, user_id: str, cost: float):
"""Record actual cost after request."""
self.daily_spend[user_id] += cost
def _reset_if_new_day(self, user_id: str):
"""Reset budget at midnight."""
if datetime.now().date() > self.last_reset[user_id].date():
self.daily_spend[user_id] = 0
self.last_reset[user_id] = datetime.now()Different limits for different user tiers:
from enum import Enum
class UserTier(Enum):
FREE = "free"
PRO = "pro"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {
'requests_per_minute': 10,
'tokens_per_day': 10000,
'max_output_tokens': 500,
'daily_budget': 1.0,
},
UserTier.PRO: {
'requests_per_minute': 60,
'tokens_per_day': 100000,
'max_output_tokens': 4000,
'daily_budget': 50.0,
},
UserTier.ENTERPRISE: {
'requests_per_minute': 300,
'tokens_per_day': 1000000,
'max_output_tokens': 8000,
'daily_budget': 500.0,
},
}
def get_user_limits(user_id: str) -> dict:
"""Get rate limits for user based on their tier."""
tier = get_user_tier(user_id) # From your user database
return TIER_LIMITS[tier]Reject oversized inputs before they reach the model:
def validate_prompt(prompt: str, max_chars: int = 10000) -> dict:
"""Validate prompt before processing."""
errors = []
# Length check
if len(prompt) > max_chars:
errors.append(f"Prompt exceeds maximum length of {max_chars} characters")
# Estimated token check (rough: 4 chars per token)
estimated_tokens = len(prompt) // 4
if estimated_tokens > 4000:
errors.append("Prompt likely exceeds token limit")
# Empty check
if not prompt.strip():
errors.append("Prompt cannot be empty")
return {
'valid': len(errors) == 0,
'errors': errors,
'estimated_tokens': estimated_tokens
}Constrain maximum response length:
def generate_with_limits(prompt: str, user_tier: UserTier) -> str:
"""Generate response with output length limits."""
limits = TIER_LIMITS[user_tier]
response = llm.generate(
prompt=prompt,
max_tokens=limits['max_output_tokens'],
# Stop generation at natural endpoints
stop_sequences=["\n\n---", "THE END", "[END]"]
)
# Truncate if somehow exceeded (defensive)
if len(response) > limits['max_output_tokens'] * 5: # ~5 chars per token
response = response[:limits['max_output_tokens'] * 5] + "... [truncated]"
return responseForce regular key rotation to limit exposure:
from datetime import datetime, timedelta
class APIKeyManager:
"""Manage API keys with automatic rotation."""
def __init__(self, max_key_age_days: int = 90):
self.max_key_age = timedelta(days=max_key_age_days)
def validate_key(self, api_key: str) -> dict:
"""Validate API key and check age."""
key_data = self.get_key_data(api_key)
if not key_data:
return {'valid': False, 'error': 'Invalid API key'}
if key_data['revoked']:
return {'valid': False, 'error': 'API key has been revoked'}
age = datetime.now() - key_data['created_at']
if age > self.max_key_age:
return {
'valid': False,
'error': 'API key expired. Please generate a new key.'
}
# Warn if key is approaching expiration
days_remaining = (self.max_key_age - age).days
warning = None
if days_remaining < 14:
warning = f"API key expires in {days_remaining} days"
return {
'valid': True,
'user_id': key_data['user_id'],
'tier': key_data['tier'],
'warning': warning
}Prevent replay attacks with signed requests:
import hmac
import hashlib
import time
def sign_request(api_key: str, api_secret: str, payload: dict) -> dict:
"""Sign API request to prevent tampering and replay."""
timestamp = str(int(time.time()))
payload_str = json.dumps(payload, sort_keys=True)
# Create signature
message = f"{timestamp}:{payload_str}"
signature = hmac.new(
api_secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return {
'X-API-Key': api_key,
'X-Timestamp': timestamp,
'X-Signature': signature
}
def verify_request(headers: dict, payload: dict, max_age: int = 300) -> bool:
"""Verify request signature and timestamp."""
timestamp = int(headers.get('X-Timestamp', 0))
signature = headers.get('X-Signature', '')
api_key = headers.get('X-API-Key', '')
# Check timestamp freshness (prevent replay)
if abs(time.time() - timestamp) > max_age:
return False
# Verify signature
api_secret = get_secret_for_key(api_key)
payload_str = json.dumps(payload, sort_keys=True)
expected = hmac.new(
api_secret.encode(),
f"{timestamp}:{payload_str}".encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)Detect unusual usage patterns:
class UsageAnomalyDetector:
"""Detect anomalous API usage patterns."""
def __init__(self):
self.baseline = {} # user_id -> average usage
def check_anomaly(self, user_id: str, current_usage: dict) -> dict:
"""Check if current usage is anomalous."""
baseline = self.baseline.get(user_id, {
'avg_requests_per_hour': 10,
'avg_tokens_per_request': 500,
'avg_cost_per_day': 1.0
})
anomalies = []
# Request spike
if current_usage['requests_per_hour'] > baseline['avg_requests_per_hour'] * 5:
anomalies.append({
'type': 'request_spike',
'severity': 'high',
'current': current_usage['requests_per_hour'],
'baseline': baseline['avg_requests_per_hour']
})
# Token usage spike
if current_usage['avg_tokens'] > baseline['avg_tokens_per_request'] * 3:
anomalies.append({
'type': 'token_spike',
'severity': 'medium',
'current': current_usage['avg_tokens'],
'baseline': baseline['avg_tokens_per_request']
})
return {
'is_anomalous': len(anomalies) > 0,
'anomalies': anomalies,
'action': 'block' if any(a['severity'] == 'high' for a in anomalies) else 'alert'
}Get notified before costs spiral:
def check_cost_alerts(user_id: str, current_spend: float) -> list[dict]:
"""Check if any cost thresholds have been crossed."""
alerts = []
budget = get_user_budget(user_id)
thresholds = [
(0.5, 'warning', '50% of daily budget consumed'),
(0.8, 'critical', '80% of daily budget consumed'),
(1.0, 'limit', 'Daily budget exhausted'),
]
for threshold, severity, message in thresholds:
if current_spend >= budget * threshold:
alerts.append({
'severity': severity,
'message': message,
'spend': current_spend,
'budget': budget
})
return alertsSecuring LLM APIs requires thinking beyond traditional API security. The key principles:
The organizations that treat LLM API security as a first-class concern will avoid the costly surprises that come from unprotected endpoints. In a world where a single API call can cost dollars, security isn't just about data—it's about survival.