Skip to main contentSkip to navigation
ai-security
api-security
llm
infrastructure

Securing LLM API Endpoints in Production

Protecting your LLM APIs from abuse, token exhaustion, and unauthorized access with practical rate limiting and authentication patterns.

Publié February 5, 2025

7 min de lecture

Your LLM API is one of the most expensive endpoints in your infrastructure. A single malicious user can rack up thousands of dollars in API costs, exhaust your rate limits, or abuse your service for spam generation. Securing these endpoints isn't optional—it's essential for survival.

Why LLM APIs Are Different

Traditional API security focuses on preventing unauthorized access and data breaches. LLM APIs have those concerns plus unique challenges:

  • Cost amplification: A single request can cost $0.10 or more in token processing
  • Compute intensity: Each request consumes significant GPU resources
  • Output unpredictability: Responses vary wildly in length and cost
  • Abuse potential: Your LLM can be weaponized for spam, phishing, or harmful content generation
# Cost comparison: Traditional API vs LLM API
traditional_api_cost = 0.0001  # $0.0001 per request
llm_api_cost = 0.10           # $0.10 per request (1000x more expensive)
 
# 10,000 malicious requests
traditional_damage = 10000 * traditional_api_cost  # $1
llm_damage = 10000 * llm_api_cost                  # $1,000

Attack Vectors

Token Exhaustion

Attackers craft prompts designed to maximize token consumption:

# Malicious prompt designed to generate maximum output
exhaustion_prompt = """
Write a comprehensive 10,000 word essay covering every aspect of quantum physics,
including detailed mathematical proofs, historical context, and future implications.
Include extensive code examples in Python, Java, and Rust.
"""
 
# Or using the model against itself
recursive_prompt = """
Repeat the following paragraph 100 times, adding one word each time:
"This is a test of the token exhaustion attack methodology."
"""

Prompt Bombing

Flooding the API with expensive requests to drain budgets or cause denial of service:

import asyncio
import aiohttp
 
async def prompt_bomb(api_url: str, num_requests: int = 1000):
    """Flood an unprotected LLM API with requests."""
    expensive_prompt = "Write a detailed 5000 word analysis..."
 
    async with aiohttp.ClientSession() as session:
        tasks = [
            session.post(api_url, json={"prompt": expensive_prompt})
            for _ in range(num_requests)
        ]
        await asyncio.gather(*tasks)
 
# Without protection: $100+ in API costs in seconds

Credential Stuffing

Using stolen API keys or session tokens to abuse the service:

# Attackers test leaked credentials against your API
leaked_keys = ["sk-abc123...", "sk-def456...", "sk-ghi789..."]
 
for key in leaked_keys:
    response = requests.post(
        "https://your-llm-api.com/generate",
        headers={"Authorization": f"Bearer {key}"},
        json={"prompt": "Test"}
    )
    if response.status_code == 200:
        print(f"Valid key found: {key}")

Free Tier Abuse

Creating multiple accounts to bypass usage limits:

# Automating account creation to bypass per-user limits
for i in range(100):
    email = f"throwaway{i}@tempmail.com"
    create_account(email)
    api_key = get_api_key(email)
    # Now attacker has 100x the free tier limit

Rate Limiting Strategies

Token-Based Rate Limiting

Limit by tokens consumed, not just request count:

from datetime import datetime, timedelta
from collections import defaultdict
 
class TokenRateLimiter:
    """Rate limit based on tokens consumed, not request count."""
 
    def __init__(self, tokens_per_hour: int = 100000):
        self.tokens_per_hour = tokens_per_hour
        self.usage = defaultdict(list)
 
    def check_limit(self, user_id: str, estimated_tokens: int) -> dict:
        """Check if user can make request with estimated token count."""
        self._cleanup_old_usage(user_id)
 
        current_usage = sum(t['tokens'] for t in self.usage[user_id])
 
        if current_usage + estimated_tokens > self.tokens_per_hour:
            return {
                'allowed': False,
                'current_usage': current_usage,
                'limit': self.tokens_per_hour,
                'retry_after': self._get_retry_after(user_id)
            }
 
        return {'allowed': True, 'current_usage': current_usage}
 
    def record_usage(self, user_id: str, tokens_used: int):
        """Record actual token usage after request completes."""
        self.usage[user_id].append({
            'tokens': tokens_used,
            'timestamp': datetime.now()
        })
 
    def _cleanup_old_usage(self, user_id: str):
        """Remove usage records older than 1 hour."""
        cutoff = datetime.now() - timedelta(hours=1)
        self.usage[user_id] = [
            u for u in self.usage[user_id]
            if u['timestamp'] > cutoff
        ]
 
    def _get_retry_after(self, user_id: str) -> int:
        """Calculate seconds until oldest usage record expires."""
        if not self.usage[user_id]:
            return 0
        oldest = min(u['timestamp'] for u in self.usage[user_id])
        expires = oldest + timedelta(hours=1)
        return max(0, int((expires - datetime.now()).total_seconds()))

Cost-Based Rate Limiting

Limit based on actual dollar cost:

class CostRateLimiter:
    """Rate limit based on API cost, not tokens or requests."""
 
    # Pricing per 1K tokens (example rates)
    PRICING = {
        'gpt-4': {'input': 0.03, 'output': 0.06},
        'gpt-3.5-turbo': {'input': 0.0015, 'output': 0.002},
        'claude-3-opus': {'input': 0.015, 'output': 0.075},
    }
 
    def __init__(self, daily_budget: float = 10.0):
        self.daily_budget = daily_budget
        self.daily_spend = defaultdict(float)
        self.last_reset = defaultdict(datetime.now)
 
    def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        """Calculate cost for a request."""
        pricing = self.PRICING.get(model, self.PRICING['gpt-3.5-turbo'])
        input_cost = (input_tokens / 1000) * pricing['input']
        output_cost = (output_tokens / 1000) * pricing['output']
        return input_cost + output_cost
 
    def check_budget(self, user_id: str, estimated_cost: float) -> dict:
        """Check if user has remaining budget."""
        self._reset_if_new_day(user_id)
 
        remaining = self.daily_budget - self.daily_spend[user_id]
 
        if estimated_cost > remaining:
            return {
                'allowed': False,
                'spent': self.daily_spend[user_id],
                'budget': self.daily_budget,
                'remaining': remaining
            }
 
        return {'allowed': True, 'remaining': remaining}
 
    def record_spend(self, user_id: str, cost: float):
        """Record actual cost after request."""
        self.daily_spend[user_id] += cost
 
    def _reset_if_new_day(self, user_id: str):
        """Reset budget at midnight."""
        if datetime.now().date() > self.last_reset[user_id].date():
            self.daily_spend[user_id] = 0
            self.last_reset[user_id] = datetime.now()

Tiered Rate Limits

Different limits for different user tiers:

from enum import Enum
 
class UserTier(Enum):
    FREE = "free"
    PRO = "pro"
    ENTERPRISE = "enterprise"
 
TIER_LIMITS = {
    UserTier.FREE: {
        'requests_per_minute': 10,
        'tokens_per_day': 10000,
        'max_output_tokens': 500,
        'daily_budget': 1.0,
    },
    UserTier.PRO: {
        'requests_per_minute': 60,
        'tokens_per_day': 100000,
        'max_output_tokens': 4000,
        'daily_budget': 50.0,
    },
    UserTier.ENTERPRISE: {
        'requests_per_minute': 300,
        'tokens_per_day': 1000000,
        'max_output_tokens': 8000,
        'daily_budget': 500.0,
    },
}
 
def get_user_limits(user_id: str) -> dict:
    """Get rate limits for user based on their tier."""
    tier = get_user_tier(user_id)  # From your user database
    return TIER_LIMITS[tier]

Input Validation

Prompt Size Limits

Reject oversized inputs before they reach the model:

def validate_prompt(prompt: str, max_chars: int = 10000) -> dict:
    """Validate prompt before processing."""
    errors = []
 
    # Length check
    if len(prompt) > max_chars:
        errors.append(f"Prompt exceeds maximum length of {max_chars} characters")
 
    # Estimated token check (rough: 4 chars per token)
    estimated_tokens = len(prompt) // 4
    if estimated_tokens > 4000:
        errors.append("Prompt likely exceeds token limit")
 
    # Empty check
    if not prompt.strip():
        errors.append("Prompt cannot be empty")
 
    return {
        'valid': len(errors) == 0,
        'errors': errors,
        'estimated_tokens': estimated_tokens
    }

Output Length Limits

Constrain maximum response length:

def generate_with_limits(prompt: str, user_tier: UserTier) -> str:
    """Generate response with output length limits."""
    limits = TIER_LIMITS[user_tier]
 
    response = llm.generate(
        prompt=prompt,
        max_tokens=limits['max_output_tokens'],
        # Stop generation at natural endpoints
        stop_sequences=["\n\n---", "THE END", "[END]"]
    )
 
    # Truncate if somehow exceeded (defensive)
    if len(response) > limits['max_output_tokens'] * 5:  # ~5 chars per token
        response = response[:limits['max_output_tokens'] * 5] + "... [truncated]"
 
    return response

Authentication Patterns

API Key Rotation

Force regular key rotation to limit exposure:

from datetime import datetime, timedelta
 
class APIKeyManager:
    """Manage API keys with automatic rotation."""
 
    def __init__(self, max_key_age_days: int = 90):
        self.max_key_age = timedelta(days=max_key_age_days)
 
    def validate_key(self, api_key: str) -> dict:
        """Validate API key and check age."""
        key_data = self.get_key_data(api_key)
 
        if not key_data:
            return {'valid': False, 'error': 'Invalid API key'}
 
        if key_data['revoked']:
            return {'valid': False, 'error': 'API key has been revoked'}
 
        age = datetime.now() - key_data['created_at']
        if age > self.max_key_age:
            return {
                'valid': False,
                'error': 'API key expired. Please generate a new key.'
            }
 
        # Warn if key is approaching expiration
        days_remaining = (self.max_key_age - age).days
        warning = None
        if days_remaining < 14:
            warning = f"API key expires in {days_remaining} days"
 
        return {
            'valid': True,
            'user_id': key_data['user_id'],
            'tier': key_data['tier'],
            'warning': warning
        }

Request Signing

Prevent replay attacks with signed requests:

import hmac
import hashlib
import time
 
def sign_request(api_key: str, api_secret: str, payload: dict) -> dict:
    """Sign API request to prevent tampering and replay."""
    timestamp = str(int(time.time()))
    payload_str = json.dumps(payload, sort_keys=True)
 
    # Create signature
    message = f"{timestamp}:{payload_str}"
    signature = hmac.new(
        api_secret.encode(),
        message.encode(),
        hashlib.sha256
    ).hexdigest()
 
    return {
        'X-API-Key': api_key,
        'X-Timestamp': timestamp,
        'X-Signature': signature
    }
 
def verify_request(headers: dict, payload: dict, max_age: int = 300) -> bool:
    """Verify request signature and timestamp."""
    timestamp = int(headers.get('X-Timestamp', 0))
    signature = headers.get('X-Signature', '')
    api_key = headers.get('X-API-Key', '')
 
    # Check timestamp freshness (prevent replay)
    if abs(time.time() - timestamp) > max_age:
        return False
 
    # Verify signature
    api_secret = get_secret_for_key(api_key)
    payload_str = json.dumps(payload, sort_keys=True)
    expected = hmac.new(
        api_secret.encode(),
        f"{timestamp}:{payload_str}".encode(),
        hashlib.sha256
    ).hexdigest()
 
    return hmac.compare_digest(signature, expected)

Monitoring and Alerting

Anomaly Detection

Detect unusual usage patterns:

class UsageAnomalyDetector:
    """Detect anomalous API usage patterns."""
 
    def __init__(self):
        self.baseline = {}  # user_id -> average usage
 
    def check_anomaly(self, user_id: str, current_usage: dict) -> dict:
        """Check if current usage is anomalous."""
        baseline = self.baseline.get(user_id, {
            'avg_requests_per_hour': 10,
            'avg_tokens_per_request': 500,
            'avg_cost_per_day': 1.0
        })
 
        anomalies = []
 
        # Request spike
        if current_usage['requests_per_hour'] > baseline['avg_requests_per_hour'] * 5:
            anomalies.append({
                'type': 'request_spike',
                'severity': 'high',
                'current': current_usage['requests_per_hour'],
                'baseline': baseline['avg_requests_per_hour']
            })
 
        # Token usage spike
        if current_usage['avg_tokens'] > baseline['avg_tokens_per_request'] * 3:
            anomalies.append({
                'type': 'token_spike',
                'severity': 'medium',
                'current': current_usage['avg_tokens'],
                'baseline': baseline['avg_tokens_per_request']
            })
 
        return {
            'is_anomalous': len(anomalies) > 0,
            'anomalies': anomalies,
            'action': 'block' if any(a['severity'] == 'high' for a in anomalies) else 'alert'
        }

Cost Alerts

Get notified before costs spiral:

def check_cost_alerts(user_id: str, current_spend: float) -> list[dict]:
    """Check if any cost thresholds have been crossed."""
    alerts = []
    budget = get_user_budget(user_id)
 
    thresholds = [
        (0.5, 'warning', '50% of daily budget consumed'),
        (0.8, 'critical', '80% of daily budget consumed'),
        (1.0, 'limit', 'Daily budget exhausted'),
    ]
 
    for threshold, severity, message in thresholds:
        if current_spend >= budget * threshold:
            alerts.append({
                'severity': severity,
                'message': message,
                'spend': current_spend,
                'budget': budget
            })
 
    return alerts

Conclusion

Securing LLM APIs requires thinking beyond traditional API security. The key principles:

  1. Rate limit by cost, not just requests - A single expensive request can do more damage than a thousand cheap ones
  2. Validate inputs aggressively - Reject oversized, malformed, or suspicious prompts before they reach the model
  3. Implement tiered access - Not all users need the same limits; match resources to trust level
  4. Monitor for anomalies - Unusual patterns often indicate abuse or compromised credentials

The organizations that treat LLM API security as a first-class concern will avoid the costly surprises that come from unprotected endpoints. In a world where a single API call can cost dollars, security isn't just about data—it's about survival.

Securing LLM API Endpoints in Production | Musah Abdulai