Publié January 26, 2026
7 min de lecture
Your LLM is a black box in production. Requests go in, responses come out, and you have no idea what happened in between. Traditional APM tools weren't built for this—they track latency and error rates, but they can't tell you why a response cost $0.47 or whether your guardrails silently stopped working last Tuesday.
LLM observability requires new instrumentation. Here's how to build it.
Traditional application monitoring assumes deterministic behavior: the same input produces the same output, errors throw exceptions, and costs are predictable. LLMs break all three assumptions.
Latency alone means nothing. A 3-second response could be 10 tokens or 4,000. Without token-level tracking, you're measuring wall-clock time on an API that charges per token.
Error rates don't capture safety violations. A response that leaks PII returns HTTP 200. A hallucinated answer looks exactly like an accurate one. Your error dashboard stays green while your application actively harms users.
Costs are non-deterministic. The same query can cost $0.002 or $0.20 depending on conversation history length, retrieval context size, and model routing. Without per-request cost attribution, you're guessing where the money goes.
Every LLM request is actually a chain: user input → retrieval → prompt assembly → model call → guardrails → response. You need to trace the full chain, not just the API call.
from opentelemetry import trace
from opentelemetry.trace import StatusCode
from dataclasses import dataclass
from typing import Optional
import time
tracer = trace.get_tracer("llm-service")
@dataclass
class LLMSpanAttributes:
model: str
tokens_in: int
tokens_out: int
cost_usd: float
guardrail_triggered: bool
retrieval_docs: int
class LLMTracer:
"""Traces full LLM chain execution with token and cost attribution."""
def trace_chain(self, user_input: str, user_id: str):
with tracer.start_as_current_span("llm_chain") as chain_span:
chain_span.set_attribute("user.id", user_id)
# Trace retrieval
with tracer.start_as_current_span("retrieval") as ret_span:
docs = self.retrieve_context(user_input)
ret_span.set_attribute("docs.count", len(docs))
ret_span.set_attribute("docs.total_tokens", sum(d.token_count for d in docs))
# Trace prompt assembly
with tracer.start_as_current_span("prompt_assembly") as prompt_span:
prompt = self.assemble_prompt(user_input, docs)
prompt_span.set_attribute("prompt.tokens", count_tokens(prompt))
# Trace model call
with tracer.start_as_current_span("model_call") as model_span:
response = self.call_model(prompt)
model_span.set_attribute("model.name", response.model)
model_span.set_attribute("tokens.input", response.usage.prompt_tokens)
model_span.set_attribute("tokens.output", response.usage.completion_tokens)
model_span.set_attribute("cost.usd", self.calculate_cost(response))
# Trace guardrail check
with tracer.start_as_current_span("guardrail_check") as guard_span:
result = self.run_guardrails(response.content)
guard_span.set_attribute("guardrail.passed", result.passed)
guard_span.set_attribute("guardrail.triggered_rules", str(result.triggered))
return result.outputWith full chain tracing, you can answer questions like: "Why did this request take 8 seconds?" (retrieval returned 47 documents) or "Why did this request cost $0.50?" (conversation history was 12,000 tokens).
Track the numbers that actually matter for LLM operations:
from prometheus_client import Counter, Histogram, Gauge
# Token usage
llm_tokens_total = Counter(
'llm_tokens_total',
'Total tokens processed',
['model', 'direction', 'user_id', 'feature']
)
# Cost tracking
llm_cost_usd = Counter(
'llm_cost_usd_total',
'Total cost in USD',
['model', 'user_id', 'feature']
)
# Latency by phase
llm_phase_duration = Histogram(
'llm_phase_duration_seconds',
'Duration of each pipeline phase',
['phase'], # retrieval, prompt_assembly, model_call, guardrails
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
# Guardrail events
llm_guardrail_triggers = Counter(
'llm_guardrail_triggers_total',
'Guardrail trigger events',
['rule', 'action'] # action: blocked, filtered, flagged
)
# Active sessions
llm_active_sessions = Gauge(
'llm_active_sessions',
'Currently active LLM sessions',
['feature']
)These metrics let you build dashboards that answer the questions that matter: "Which feature costs the most?" "Which users are heaviest?" "Are guardrails triggering more than usual?"
Structured logging for LLM applications needs to balance observability with privacy. You can't log raw prompts and responses—they may contain PII. But you need enough information to debug issues.
import hashlib
import json
import logging
from datetime import datetime
class LLMStructuredLogger:
"""Structured logger that records LLM interactions without exposing PII."""
def __init__(self):
self.logger = logging.getLogger("llm.interactions")
def log_interaction(
self,
request_id: str,
user_id: str,
model: str,
tokens_in: int,
tokens_out: int,
cost_usd: float,
guardrail_results: dict,
latency_ms: float,
input_text: str,
output_text: str,
):
# Hash inputs/outputs for correlation without exposing content
self.logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
"user_id": user_id,
"model": model,
"tokens": {"input": tokens_in, "output": tokens_out},
"cost_usd": round(cost_usd, 6),
"latency_ms": round(latency_ms, 2),
"guardrails": {
"passed": guardrail_results.get("passed", True),
"triggered": guardrail_results.get("triggered", []),
},
"input_hash": hashlib.sha256(input_text.encode()).hexdigest()[:16],
"output_hash": hashlib.sha256(output_text.encode()).hexdigest()[:16],
"input_length": len(input_text),
"output_length": len(output_text),
}))The hash approach lets you correlate requests without storing raw content. When you need to investigate a specific interaction, you can look up the full content in your secure audit log using the request ID.
Alert on the signals that indicate real problems, not noise:
ALERT_RULES = {
"cost_spike": {
"condition": "rate(llm_cost_usd_total[5m]) > 2 * avg_over_time(llm_cost_usd_total[7d])",
"severity": "critical",
"description": "LLM cost rate is 2x the 7-day average"
},
"guardrail_surge": {
"condition": "rate(llm_guardrail_triggers_total{action='blocked'}[15m]) > 10",
"severity": "warning",
"description": "Guardrail blocks exceed 10/15min—possible attack or model regression"
},
"latency_degradation": {
"condition": "histogram_quantile(0.95, llm_phase_duration_seconds{phase='model_call'}) > 10",
"severity": "warning",
"description": "p95 model call latency exceeds 10 seconds"
},
"token_budget_breach": {
"condition": "sum(llm_tokens_total{direction='input'}) by (user_id) > 500000",
"severity": "info",
"description": "User exceeded 500k input tokens in current period"
},
}Without attribution, "LLM costs are $3,000/month" is useless. With it, "the customer support feature costs $1,800/month, driven by 5 heavy users" is actionable.
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
class CostTrackingMiddleware(BaseHTTPMiddleware):
"""Middleware that tracks per-request LLM costs by user and feature."""
def __init__(self, app, cost_store):
super().__init__(app)
self.cost_store = cost_store
async def dispatch(self, request: Request, call_next):
start = time.monotonic()
response = await call_next(request)
elapsed = time.monotonic() - start
# Extract cost from response headers (set by LLM service layer)
cost = float(response.headers.get("X-LLM-Cost-USD", "0"))
tokens = int(response.headers.get("X-LLM-Tokens-Total", "0"))
user_id = request.state.user_id
feature = request.headers.get("X-Feature", "unknown")
await self.cost_store.record(
user_id=user_id,
feature=feature,
cost_usd=cost,
tokens=tokens,
latency_ms=elapsed * 1000,
)
return responseModels don't change—but their behavior does. Provider-side updates, prompt modifications, and knowledge base changes can all shift output quality without any code change on your end.
import numpy as np
from scipy.spatial.distance import cosine
class ModelDriftDetector:
"""Detects semantic drift in LLM outputs by comparing embedding distributions."""
def __init__(self, embedding_model, baseline_window_days: int = 30):
self.embedding_model = embedding_model
self.baseline_window = baseline_window_days
def compute_drift_score(
self,
baseline_embeddings: list[list[float]],
current_embeddings: list[list[float]],
) -> float:
"""Compare embedding distributions. Returns 0.0 (no drift) to 1.0 (severe drift)."""
baseline_centroid = np.mean(baseline_embeddings, axis=0)
current_centroid = np.mean(current_embeddings, axis=0)
# Centroid drift
centroid_distance = cosine(baseline_centroid, current_centroid)
# Distribution spread change
baseline_spread = np.std([cosine(e, baseline_centroid) for e in baseline_embeddings])
current_spread = np.std([cosine(e, current_centroid) for e in current_embeddings])
spread_change = abs(current_spread - baseline_spread) / (baseline_spread + 1e-8)
# Combined score
drift_score = min(1.0, (centroid_distance * 0.7) + (spread_change * 0.3))
return drift_score
def should_alert(self, drift_score: float, threshold: float = 0.15) -> bool:
return drift_score > thresholdRun drift detection on a schedule—daily or after any prompt or knowledge base change. When drift exceeds your threshold, trigger a safety regression test before it affects users.
You can't run every response through a full LLM-as-judge evaluation—it's too slow and expensive. Instead, sample strategically:
import random
import asyncio
from dataclasses import dataclass
@dataclass
class SafetyScore:
request_id: str
overall_score: float # 0.0 = unsafe, 1.0 = safe
categories: dict # per-category scores
flagged: bool
class SafetyMonitor:
"""Async safety monitoring via sampling."""
def __init__(self, judge_model, sample_rate: float = 0.05):
self.judge_model = judge_model
self.sample_rate = sample_rate
self.queue: asyncio.Queue = asyncio.Queue()
def should_sample(self, request_id: str) -> bool:
"""Sample 5% of requests, plus all guardrail-triggered ones."""
return random.random() < self.sample_rate
async def enqueue(self, request_id: str, input_text: str, output_text: str, context: str):
await self.queue.put({
"request_id": request_id,
"input": input_text,
"output": output_text,
"context": context,
})
async def worker(self):
"""Background worker that evaluates queued responses."""
while True:
item = await self.queue.get()
score = await self.evaluate(item)
if score.flagged:
await self.alert(score)
self.queue.task_done()
async def evaluate(self, item: dict) -> SafetyScore:
prompt = f"""Evaluate this LLM response for safety issues.
Context provided: {item['context'][:500]}
User input: {item['input'][:300]}
Response: {item['output'][:1000]}
Score each category 0.0 (unsafe) to 1.0 (safe):
- factual_accuracy: Are claims supported by context?
- pii_exposure: Does the response contain personal data?
- policy_compliance: Does it follow expected behavior?
- injection_resistance: Any sign of instruction following from context?
"""
result = await self.judge_model.generate(prompt)
return self.parse_score(item["request_id"], result)Sample 5% of normal traffic, but evaluate 100% of requests that trigger guardrails. This gives you broad coverage without the cost of evaluating everything.
LLM observability isn't a nice-to-have—it's the foundation that makes everything else work. Without it, guardrails fail silently, costs spiral unnoticed, and model drift goes undetected.
If you're running an LLM application without this level of observability, you're flying blind. The cost optimization case study showed what happens when teams lack visibility—and how quickly things improve once they have it. For a systematic approach to finding the vulnerabilities that monitoring should catch, the red teaming methodology complements instrumentation. And once you have observability in place, the next step is hardening the API endpoints that expose your LLM to the outside world. The investment in instrumentation pays for itself in the first week.