API Security Patterns & Anti-Patterns
Reference for securing REST APIs, webhooks, and service-to-service communication. Use during
007 audit,007 threat-model, and code reviews of API code.
1. Authentication Patterns
API Keys
# GOOD: API key in header Authorization: ApiKey sk-live-abc123def456 # BAD: API key in URL (logged in server logs, browser history, referrer headers) GET /api/data?api_key=sk-live-abc123def456 # Best practices: api_keys: - Prefix keys for identification: sk-live-, sk-test-, pk- - Store hashed (SHA-256), not plaintext - Rotate regularly (90 days max) - Scope to specific permissions/resources - Rate limit per key - Revoke immediately on compromise - Different keys per environment (dev/staging/prod)
OAuth 2.0
# Recommended flows by client type oauth2_flows: server_to_server: client_credentials web_app_with_backend: authorization_code + PKCE single_page_app: authorization_code + PKCE (no client secret) mobile_app: authorization_code + PKCE NEVER_USE: implicit_grant # Deprecated, tokens exposed in URL # Token best practices tokens: access_token_lifetime: 15_minutes # Short-lived refresh_token_lifetime: 7_days # Rotate on use refresh_token_rotation: true # New refresh token each time store_tokens: httponly_secure_cookie # Not localStorage revocation: implement_revocation_endpoint
JWT Best Practices
# GOOD: Proper JWT configuration jwt_config = { "algorithm": "RS256", # Asymmetric, not HS256 with weak secret "expiration": 900, # 15 minutes max "issuer": "auth.example.com", # Always validate "audience": "api.example.com", # Always validate "required_claims": ["sub", "exp", "iat", "iss", "aud"], } # BAD patterns to detect jwt_antipatterns = [ "algorithm: none", # No signature verification "algorithm: HS256", # With weak/shared secret "exp: far_future", # Tokens that never expire "no audience check", # Token reuse across services "secret in code", # Hardcoded signing key "JWT in URL parameter", # Logged, cached, leaked via referrer ] # CRITICAL: Always validate def validate_jwt(token: str) -> dict: return jwt.decode( token, key=PUBLIC_KEY, # Not a weak shared secret algorithms=["RS256"], # Explicit, not from token header audience="api.example.com", issuer="auth.example.com", options={"require": ["exp", "iat", "sub"]}, )
2. Rate Limiting Strategies
Token Bucket
# Best for: Allowing bursts while maintaining average rate class TokenBucket: """ capacity=100, refill_rate=10/sec Allows burst of 100 requests, then 10/sec sustained. """ def __init__(self, capacity: int, refill_rate: float): self.capacity = capacity self.tokens = capacity self.refill_rate = refill_rate self.last_refill = time.time() def allow_request(self) -> bool: self._refill() if self.tokens >= 1: self.tokens -= 1 return True return False
Sliding Window
# Best for: Smooth rate limiting without burst allowance # Track requests in time windows, count requests in last N seconds # Redis implementation: ZADD + ZRANGEBYSCORE + ZCARD
Per-User Rate Limits
rate_limits: unauthenticated: requests_per_minute: 20 requests_per_hour: 100 authenticated_free: requests_per_minute: 60 requests_per_hour: 1000 authenticated_paid: requests_per_minute: 300 requests_per_hour: 10000 # Always include response headers headers: X-RateLimit-Limit: "60" X-RateLimit-Remaining: "45" X-RateLimit-Reset: "1620000060" # Unix timestamp Retry-After: "30" # On 429 response
3. Input Validation
Schema Validation
from pydantic import BaseModel, Field, validator class CreateUserRequest(BaseModel): name: str = Field(min_length=1, max_length=100) email: str = Field(regex=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$") age: int = Field(ge=13, le=150) role: str = Field(default="user") # Ignore if user tries to set "admin" @validator("role") def restrict_role(cls, v): if v not in ("user", "viewer"): # Only allow safe roles return "user" return v class Config: extra = "forbid" # Reject unknown fields (prevent mass assignment)
Type Checking and Size Limits
validation_rules: string_fields: max_length: 10_000 # No unbounded strings strip_whitespace: true reject_null_bytes: true # \x00 can cause issues numeric_fields: define_min_max: true # Always set bounds reject_nan_infinity: true # Can break math operations array_fields: max_items: 100 # No unbounded arrays validate_each_item: true file_uploads: max_size: 10MB allowed_types: ["image/jpeg", "image/png", "application/pdf"] validate_magic_bytes: true # Don't trust Content-Type header alone scan_for_malware: true query_parameters: max_page_size: 100 default_page_size: 20 max_query_length: 500
4. Webhook Security
HMAC Signature Verification
import hmac import hashlib import time def verify_webhook(payload: bytes, headers: dict, secret: str) -> bool: """Full webhook verification: signature + timestamp.""" signature = headers.get("X-Webhook-Signature") timestamp = headers.get("X-Webhook-Timestamp") if not signature or not timestamp: return False # 1. Prevent replay attacks (5-minute window) if abs(time.time() - int(timestamp)) > 300: return False # 2. Compute expected signature signed_payload = f"{timestamp}.{payload.decode()}" expected = hmac.new( secret.encode(), signed_payload.encode(), hashlib.sha256 ).hexdigest() # 3. Constant-time comparison (prevents timing attacks) return hmac.compare_digest(f"sha256={expected}", signature)
Webhook Best Practices
webhook_security: sending: - Sign every payload with HMAC-SHA256 - Include timestamp in signature - Send unique event ID for idempotency - Use HTTPS only - Implement retry with exponential backoff - Rotate signing secrets periodically receiving: - Verify signature BEFORE any processing - Reject requests older than 5 minutes (replay protection) - Implement idempotency (store processed event IDs) - Return 200 quickly, process async - Don't trust payload data blindly (validate schema) - Rate limit incoming webhooks - Log all webhook events for audit
5. CORS Configuration
# DANGEROUS: Allow everything # Access-Control-Allow-Origin: * # Access-Control-Allow-Credentials: true # INVALID with * origin # SECURE: Explicit allowlist CORS_CONFIG = { "allowed_origins": [ "https://app.example.com", "https://admin.example.com", ], "allowed_methods": ["GET", "POST", "PUT", "DELETE"], "allowed_headers": ["Authorization", "Content-Type"], "allow_credentials": True, "max_age": 3600, # Preflight cache (1 hour) "expose_headers": ["X-RateLimit-Remaining"], } # Anti-patterns to detect cors_antipatterns = [ "Access-Control-Allow-Origin: *", # Too permissive "reflect Origin header as Allow-Origin", # Effectively * with credentials "Access-Control-Allow-Origin: null", # Exploitable "Allow-Origin without credentials but with auth", # Inconsistent ]
6. Security Headers Checklist
# Required security headers for all API responses security_headers: # Prevent MIME sniffing X-Content-Type-Options: "nosniff" # Prevent clickjacking (for HTML responses) X-Frame-Options: "DENY" # XSS protection (legacy browsers) X-XSS-Protection: "0" # Disable, use CSP instead # HTTPS enforcement Strict-Transport-Security: "max-age=31536000; includeSubDomains; preload" # Content Security Policy (for HTML responses) Content-Security-Policy: "default-src 'self'; script-src 'self'; style-src 'self'" # Referrer policy Referrer-Policy: "strict-origin-when-cross-origin" # Permissions policy Permissions-Policy: "camera=(), microphone=(), geolocation=()" # Remove server info headers Server: REMOVE_THIS_HEADER X-Powered-By: REMOVE_THIS_HEADER # Cache control for sensitive data Cache-Control: "no-store, no-cache, must-revalidate, private" Pragma: "no-cache"
7. Common API Vulnerabilities
BOLA / IDOR (Broken Object Level Authorization)
# VULNERABLE: No ownership check @app.get("/api/users/{user_id}/orders") def get_orders(user_id: int): return db.query(Order).filter(Order.user_id == user_id).all() # Any authenticated user can access any other user's orders # SECURE: Enforce ownership @app.get("/api/users/{user_id}/orders") def get_orders(user_id: int, current_user: User = Depends(get_current_user)): if current_user.id != user_id and not current_user.is_admin: raise HTTPException(403, "Forbidden") return db.query(Order).filter(Order.user_id == user_id).all()
Mass Assignment
# VULNERABLE: Accept all fields from request @app.put("/api/users/{user_id}") def update_user(user_id: int, data: dict): db.query(User).filter(User.id == user_id).update(data) # Attacker sends {"role": "admin", "is_verified": true} # SECURE: Explicit allowlist of updatable fields class UserUpdateRequest(BaseModel): name: str | None = None email: str | None = None # role and is_verified are NOT included @app.put("/api/users/{user_id}") def update_user(user_id: int, data: UserUpdateRequest): db.query(User).filter(User.id == user_id).update( data.dict(exclude_unset=True) )
Excessive Data Exposure
# VULNERABLE: Return entire database model @app.get("/api/users/{user_id}") def get_user(user_id: int): return db.query(User).get(user_id).__dict__ # Returns: id, name, email, password_hash, ssn, internal_notes, ... # SECURE: Explicit response schema class UserResponse(BaseModel): id: int name: str email: str # Only public fields @app.get("/api/users/{user_id}", response_model=UserResponse) def get_user(user_id: int): return db.query(User).get(user_id)
8. Idempotency Patterns
# Prevent duplicate processing of the same request # Essential for: payments, webhooks, any non-idempotent operation class IdempotencyMiddleware: """ Client sends: Idempotency-Key: unique-uuid-here Server stores result and returns cached response on retry. """ def __init__(self, cache): self.cache = cache # Redis or similar async def process(self, idempotency_key: str, handler): # 1. Check if already processed cached = await self.cache.get(f"idempotency:{idempotency_key}") if cached: return cached # Return same response as first time # 2. Lock to prevent concurrent duplicate processing lock = await self.cache.lock(f"lock:{idempotency_key}", timeout=30) if not lock: raise HTTPException(409, "Request already in progress") try: # 3. Process the request result = await handler() # 4. Cache the result (24h TTL) await self.cache.set( f"idempotency:{idempotency_key}", result, ttl=86400, ) return result finally: await lock.release()
When to Require Idempotency Keys
require_idempotency_key: - POST /payments - POST /transfers - POST /orders - POST /webhooks/* # Use event ID as key - Any non-idempotent mutation naturally_idempotent: # No key needed - GET (all) - PUT (full replacement) - DELETE (by ID)
Quick Security Review Checklist
Authentication: [ ] All endpoints require authentication (unless explicitly public) [ ] API keys are in headers, not URLs [ ] JWTs use RS256 with short expiry [ ] OAuth 2.0 with PKCE for public clients [ ] Token rotation implemented Authorization: [ ] Ownership check on every data access (BOLA prevention) [ ] Role check on every privileged operation [ ] Mass assignment protection (explicit field allowlists) [ ] Response schemas filter sensitive fields Input/Output: [ ] Schema validation on all inputs [ ] Size limits on all fields, arrays, and files [ ] Parameterized queries (no string concatenation) [ ] Generic error messages (no stack traces) Transport: [ ] HTTPS everywhere (TLS 1.2+) [ ] Security headers set [ ] CORS explicitly configured [ ] HSTS enabled Operations: [ ] Rate limiting per user/IP [ ] Request logging with correlation IDs [ ] Webhook signatures verified [ ] Idempotency keys for mutations [ ] Dependencies scanned for CVEs