Best Practices
API best practices ensure reliable, secure, and maintainable APIs that provide excellent developer experience. Following established patterns and conventions reduces complexity and accelerates adoption.
Design Principles
RESTful Design
Resource-Oriented URLs
# Good - Resource-oriented, hierarchical
GET /api/v1/users # List users
GET /api/v1/users/123 # Get specific user
POST /api/v1/users # Create user
PUT /api/v1/users/123 # Update user
DELETE /api/v1/users/123 # Delete user
# Nested resources
GET /api/v1/users/123/orders # User's orders
GET /api/v1/orders/456/items # Order items
# Bad - Verb-oriented, non-standard
GET /api/v1/getUsers
POST /api/v1/createUser
GET /api/v1/user/delete/123
HTTP Methods Usage
from enum import Enum
from typing import List, Optional
from fastapi import FastAPI, HTTPException, status
class HTTPMethod(Enum):
GET = "GET" # Retrieve resource(s) - Safe, Idempotent
POST = "POST" # Create resource - Not Safe, Not Idempotent
PUT = "PUT" # Replace resource - Not Safe, Idempotent
PATCH = "PATCH" # Partial update - Not Safe, Not Idempotent
DELETE = "DELETE" # Remove resource - Not Safe, Idempotent
# Proper HTTP method implementation
@app.get("/users/{user_id}")
async def get_user(user_id: str):
"""GET - Retrieve user (Safe, Idempotent)"""
user = await user_service.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users", status_code=status.HTTP_201_CREATED)
async def create_user(user_data: UserCreate):
"""POST - Create new user (Not Safe, Not Idempotent)"""
# Check for duplicates
existing = await user_service.get_user_by_email(user_data.email)
if existing:
raise HTTPException(status_code=409, detail="User already exists")
return await user_service.create_user(user_data)
@app.put("/users/{user_id}")
async def replace_user(user_id: str, user_data: UserUpdate):
"""PUT - Replace entire user (Not Safe, Idempotent)"""
# PUT should replace the entire resource
return await user_service.replace_user(user_id, user_data)
@app.patch("/users/{user_id}")
async def update_user(user_id: str, updates: dict):
"""PATCH - Partial update (Not Safe, Not Idempotent)"""
return await user_service.update_user(user_id, updates)
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: str):
"""DELETE - Remove user (Not Safe, Idempotent)"""
success = await user_service.delete_user(user_id)
if not success:
raise HTTPException(status_code=404, detail="User not found")
Consistent Error Handling
Standard Error Format
from pydantic import BaseModel
from typing import Optional, Dict, Any
class ErrorDetail(BaseModel):
field: Optional[str] = None
message: str
code: Optional[str] = None
class APIError(BaseModel):
error: str
message: str
details: Optional[List[ErrorDetail]] = None
trace_id: Optional[str] = None
timestamp: str
@classmethod
def validation_error(cls, field_errors: Dict[str, str], trace_id: str = None):
details = [
ErrorDetail(field=field, message=message, code="VALIDATION_ERROR")
for field, message in field_errors.items()
]
return cls(
error="VALIDATION_ERROR",
message="Request validation failed",
details=details,
trace_id=trace_id,
timestamp=datetime.utcnow().isoformat()
)
# Global exception handler
@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
trace_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
field_errors = {}
for error in exc.errors():
field_path = ".".join(str(x) for x in error["loc"])
field_errors[field_path] = error["msg"]
api_error = APIError.validation_error(field_errors, trace_id)
return JSONResponse(
status_code=422,
content=api_error.dict()
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
trace_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
return JSONResponse(
status_code=exc.status_code,
content={
"error": "HTTP_ERROR",
"message": exc.detail,
"trace_id": trace_id,
"timestamp": datetime.utcnow().isoformat()
}
)
Input Validation and Sanitization
Request Validation
from pydantic import BaseModel, validator, EmailStr
from typing import Optional, List
import re
class UserCreateRequest(BaseModel):
email: EmailStr
name: str
phone: Optional[str] = None
age: Optional[int] = None
tags: Optional[List[str]] = None
@validator('name')
def validate_name(cls, v):
if not v or not v.strip():
raise ValueError('Name cannot be empty')
if len(v.strip()) < 2:
raise ValueError('Name must be at least 2 characters')
if len(v.strip()) > 100:
raise ValueError('Name cannot exceed 100 characters')
# Remove potential HTML/script tags
cleaned_name = re.sub(r'<[^>]*>', '', v.strip())
if cleaned_name != v.strip():
raise ValueError('Name contains invalid characters')
return v.strip()
@validator('phone')
def validate_phone(cls, v):
if v is None:
return v
# Remove all non-digit characters
digits_only = re.sub(r'\D', '', v)
if len(digits_only) < 10:
raise ValueError('Phone number must have at least 10 digits')
if len(digits_only) > 15:
raise ValueError('Phone number cannot exceed 15 digits')
return digits_only
@validator('age')
def validate_age(cls, v):
if v is None:
return v
if v < 0:
raise ValueError('Age cannot be negative')
if v > 150:
raise ValueError('Age cannot exceed 150')
return v
@validator('tags')
def validate_tags(cls, v):
if v is None:
return v
if len(v) > 10:
raise ValueError('Cannot have more than 10 tags')
# Sanitize each tag
sanitized_tags = []
for tag in v:
if not isinstance(tag, str):
continue
# Clean and validate tag
clean_tag = re.sub(r'[^\w\-\s]', '', tag.strip())
if clean_tag and len(clean_tag) <= 50:
sanitized_tags.append(clean_tag)
return sanitized_tags
# Request size limits
from fastapi import Request
from fastapi.responses import JSONResponse
@app.middleware("http")
async def limit_upload_size(request: Request, call_next):
max_size = 10 * 1024 * 1024 # 10MB
if request.headers.get("content-length"):
content_length = int(request.headers["content-length"])
if content_length > max_size:
return JSONResponse(
status_code=413,
content={
"error": "REQUEST_TOO_LARGE",
"message": f"Request size {content_length} exceeds maximum {max_size} bytes"
}
)
return await call_next(request)
Security Best Practices
Authentication Patterns
JWT Implementation
import jwt
from datetime import datetime, timedelta
from typing import Optional
class JWTManager:
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
self.access_token_expire = timedelta(hours=1)
self.refresh_token_expire = timedelta(days=30)
def create_access_token(self, user_id: str, permissions: List[str]) -> str:
"""Create JWT access token with short expiration."""
payload = {
"sub": user_id,
"permissions": permissions,
"type": "access",
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + self.access_token_expire,
"iss": "your-api-service"
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def create_refresh_token(self, user_id: str) -> str:
"""Create JWT refresh token with long expiration."""
payload = {
"sub": user_id,
"type": "refresh",
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + self.refresh_token_expire,
"iss": "your-api-service"
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str, token_type: str = "access") -> Optional[dict]:
"""Verify and decode JWT token."""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={"verify_exp": True}
)
if payload.get("type") != token_type:
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
# Token refresh endpoint
@app.post("/auth/refresh")
async def refresh_token(refresh_token: str = Body(...)):
"""Refresh access token using refresh token."""
jwt_manager = JWTManager(settings.JWT_SECRET)
# Verify refresh token
payload = jwt_manager.verify_token(refresh_token, "refresh")
if not payload:
raise HTTPException(status_code=401, detail="Invalid refresh token")
user_id = payload["sub"]
# Get user permissions (from database)
user = await user_service.get_user(user_id)
if not user:
raise HTTPException(status_code=401, detail="User not found")
# Create new access token
new_access_token = jwt_manager.create_access_token(
user_id=user_id,
permissions=user.permissions
)
return {
"access_token": new_access_token,
"token_type": "bearer",
"expires_in": 3600
}
Rate Limiting
Advanced Rate Limiting
from typing import Dict, Optional
import asyncio
import time
from collections import defaultdict, deque
class TokenBucketRateLimiter:
def __init__(self):
self.buckets: Dict[str, dict] = defaultdict(dict)
self.cleanup_task = None
async def is_allowed(
self,
key: str,
max_requests: int,
window_seconds: int,
burst_allowance: int = None
) -> tuple[bool, dict]:
"""
Token bucket rate limiting with burst allowance.
Args:
key: Unique identifier (user_id, api_key, etc.)
max_requests: Maximum requests per window
window_seconds: Time window in seconds
burst_allowance: Additional burst capacity
Returns:
(is_allowed, metadata)
"""
now = time.time()
bucket = self.buckets[key]
# Initialize bucket if new
if "last_refill" not in bucket:
bucket.update({
"tokens": max_requests,
"last_refill": now,
"max_tokens": max_requests + (burst_allowance or 0)
})
# Calculate tokens to add based on time passed
time_passed = now - bucket["last_refill"]
tokens_to_add = (time_passed * max_requests) / window_seconds
# Add tokens (up to max capacity)
bucket["tokens"] = min(
bucket["max_tokens"],
bucket["tokens"] + tokens_to_add
)
bucket["last_refill"] = now
# Check if request is allowed
if bucket["tokens"] >= 1:
bucket["tokens"] -= 1
allowed = True
else:
allowed = False
metadata = {
"remaining": int(bucket["tokens"]),
"retry_after": int((1 - bucket["tokens"]) * window_seconds / max_requests) if not allowed else 0,
"reset_time": int(now + window_seconds)
}
return allowed, metadata
# Rate limiting middleware
from fastapi import Request, Response
from fastapi.responses import JSONResponse
rate_limiter = TokenBucketRateLimiter()
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Skip rate limiting for health checks
if request.url.path in ["/health", "/metrics"]:
return await call_next(request)
# Identify client (could be user_id, api_key, IP, etc.)
client_id = request.headers.get("X-API-Key") or request.client.host
# Different limits for different endpoints
endpoint = request.url.path
if endpoint.startswith("/api/v1/analytics"):
max_requests, window = 50, 3600 # 50 per hour for analytics
elif endpoint.startswith("/api/v1/upload"):
max_requests, window = 10, 3600 # 10 per hour for uploads
else:
max_requests, window = 1000, 3600 # 1000 per hour default
# Check rate limit
allowed, metadata = await rate_limiter.is_allowed(
key=f"{client_id}:{endpoint}",
max_requests=max_requests,
window_seconds=window,
burst_allowance=max_requests // 10 # 10% burst allowance
)
if not allowed:
return JSONResponse(
status_code=429,
content={
"error": "RATE_LIMIT_EXCEEDED",
"message": "Too many requests",
"retry_after": metadata["retry_after"]
},
headers={
"Retry-After": str(metadata["retry_after"]),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(metadata["reset_time"])
}
)
# Process request
response = await call_next(request)
# Add rate limit headers
response.headers["X-RateLimit-Remaining"] = str(metadata["remaining"])
response.headers["X-RateLimit-Reset"] = str(metadata["reset_time"])
return response
Performance Optimization
Caching Strategies
Multi-Level Caching
import asyncio
import json
import hashlib
from typing import Any, Optional, Union
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class CacheEntry:
value: Any
expires_at: datetime
created_at: datetime
hit_count: int = 0
class MultiLevelCache:
def __init__(self, redis_client=None):
self.memory_cache = {} # L1 cache
self.redis_client = redis_client # L2 cache
self.max_memory_items = 1000
self.default_ttl = 3600 # 1 hour
def _generate_key(self, key: str, **kwargs) -> str:
"""Generate cache key with parameters."""
if kwargs:
param_str = json.dumps(kwargs, sort_keys=True)
key_hash = hashlib.md5(param_str.encode()).hexdigest()[:8]
return f"{key}:{key_hash}"
return key
async def get(self, key: str, **kwargs) -> Optional[Any]:
"""Get value from cache (L1 -> L2)."""
cache_key = self._generate_key(key, **kwargs)
# Try L1 cache first
if cache_key in self.memory_cache:
entry = self.memory_cache[cache_key]
if datetime.now() < entry.expires_at:
entry.hit_count += 1
return entry.value
else:
# Expired, remove from L1
del self.memory_cache[cache_key]
# Try L2 cache (Redis)
if self.redis_client:
try:
cached_data = await self.redis_client.get(cache_key)
if cached_data:
value = json.loads(cached_data)
# Store in L1 for faster access
self._store_in_memory(cache_key, value, self.default_ttl)
return value
except Exception as e:
print(f"Redis cache error: {e}")
return None
async def set(self, key: str, value: Any, ttl: int = None, **kwargs) -> None:
"""Set value in cache (both L1 and L2)."""
cache_key = self._generate_key(key, **kwargs)
ttl = ttl or self.default_ttl
# Store in L1 cache
self._store_in_memory(cache_key, value, ttl)
# Store in L2 cache (Redis)
if self.redis_client:
try:
await self.redis_client.setex(
cache_key,
ttl,
json.dumps(value, default=str)
)
except Exception as e:
print(f"Redis cache error: {e}")
def _store_in_memory(self, key: str, value: Any, ttl: int) -> None:
"""Store in L1 memory cache with LRU eviction."""
# LRU eviction if memory cache is full
if len(self.memory_cache) >= self.max_memory_items:
# Remove least recently used item
lru_key = min(
self.memory_cache.keys(),
key=lambda k: self.memory_cache[k].hit_count
)
del self.memory_cache[lru_key]
self.memory_cache[key] = CacheEntry(
value=value,
expires_at=datetime.now() + timedelta(seconds=ttl),
created_at=datetime.now()
)
# Caching decorator
def cached(ttl: int = 3600, key_prefix: str = None):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key from function name and arguments
func_name = key_prefix or func.__name__
cache_key = f"{func_name}:{hash(str(args) + str(kwargs))}"
# Try to get from cache
cached_result = await cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function and cache result
result = await func(*args, **kwargs)
await cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# Usage example
cache = MultiLevelCache(redis_client)
@cached(ttl=1800, key_prefix="user_profile")
async def get_user_profile(user_id: str, include_orders: bool = False):
"""Get user profile with optional orders (cached for 30 minutes)."""
profile = await user_service.get_profile(user_id)
if include_orders:
profile["orders"] = await order_service.get_user_orders(user_id)
return profile
Database Optimization
Query Optimization
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import selectinload, joinedload
from typing import List, Optional
class OptimizedUserRepository:
def __init__(self, session):
self.session = session
async def get_users_optimized(
self,
filters: dict = None,
include_orders: bool = False,
limit: int = 100,
offset: int = 0
) -> List[User]:
"""
Optimized user query with:
- Selective field loading
- Efficient joins
- Index-friendly filtering
- Pagination
"""
# Build base query
query = select(User)
# Add eager loading if needed
if include_orders:
query = query.options(selectinload(User.orders))
# Apply filters efficiently
if filters:
conditions = []
# Email filter (indexed field)
if "email" in filters:
conditions.append(User.email == filters["email"])
# Date range filter (indexed field)
if "created_after" in filters:
conditions.append(User.created_at >= filters["created_after"])
if "created_before" in filters:
conditions.append(User.created_at <= filters["created_before"])
# Status filter (indexed enum)
if "status" in filters:
conditions.append(User.status.in_(filters["status"]))
# Full-text search (if supported)
if "search" in filters:
search_term = f"%{filters['search']}%"
conditions.append(
or_(
User.name.ilike(search_term),
User.email.ilike(search_term)
)
)
if conditions:
query = query.where(and_(*conditions))
# Add ordering (use indexed field)
query = query.order_by(User.created_at.desc())
# Add pagination
query = query.offset(offset).limit(limit)
# Execute query
result = await self.session.execute(query)
return result.scalars().all()
async def get_user_statistics(self, user_ids: List[str] = None) -> dict:
"""Get user statistics with efficient aggregation."""
query = select(
func.count(User.id).label("total_users"),
func.count(func.distinct(User.email)).label("unique_emails"),
func.avg(func.extract('epoch', func.now() - User.created_at) / 86400).label("avg_days_since_creation")
)
if user_ids:
query = query.where(User.id.in_(user_ids))
result = await self.session.execute(query)
return result.first()._asdict()
# Database connection pooling
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import QueuePool
class DatabaseManager:
def __init__(self, database_url: str):
self.engine = create_async_engine(
database_url,
# Connection pooling configuration
poolclass=QueuePool,
pool_size=20, # Number of connections to maintain
max_overflow=30, # Additional connections allowed
pool_pre_ping=True, # Validate connections before use
pool_recycle=3600, # Recycle connections after 1 hour
echo=False, # Set to True for SQL debugging
# Query timeout
connect_args={
"command_timeout": 30, # 30 second query timeout
"server_settings": {
"jit": "off", # Disable JIT for faster simple queries
}
}
)
async def get_session(self) -> AsyncSession:
"""Get database session with proper configuration."""
return AsyncSession(
self.engine,
expire_on_commit=False, # Don't expire objects after commit
autoflush=False, # Manual control over flushing
autocommit=False # Explicit transaction control
)
# Query monitoring
import time
from functools import wraps
def monitor_query_performance(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
# Log slow queries
if duration > 1.0: # Log queries taking more than 1 second
logger.warning(
f"Slow query detected: {func.__name__} took {duration:.2f}s"
)
# Track metrics
QUERY_DURATION.labels(
function=func.__name__
).observe(duration)
return result
except Exception as e:
duration = time.time() - start_time
logger.error(
f"Query failed: {func.__name__} failed after {duration:.2f}s - {str(e)}"
)
raise
return wrapper
Documentation Standards
API Documentation Best Practices
Comprehensive OpenAPI Documentation
# Best practices for OpenAPI documentation
openapi: 3.0.3
info:
title: User Management API
version: 2.1.0
description: |
# User Management API
This API provides comprehensive user management capabilities including:
- User account creation and management
- Authentication and authorization
- Profile management
- User search and filtering
## Getting Started
1. Obtain an API key from the developer portal
2. Include the API key in the `X-API-Key` header
3. All requests must use HTTPS
4. Rate limits apply based on your subscription tier
## Response Format
All responses follow a consistent format:
```json
{
"data": {...},
"meta": {
"timestamp": "2024-01-15T10:30:00Z",
"version": "2.1.0"
}
}
Error Handling
Errors include detailed information for troubleshooting:
{
"error": "VALIDATION_ERROR",
"message": "Request validation failed",
"details": [...],
"trace_id": "req_abc123",
"timestamp": "2024-01-15T10:30:00Z"
}
contact: name: API Support Team email: api-support@example.com url: https://support.example.com (opens in a new tab)
license: name: MIT url: https://opensource.org/licenses/MIT (opens in a new tab)
servers:
- url: https://api.example.com/v2 (opens in a new tab) description: Production server
- url: https://staging-api.example.com/v2 (opens in a new tab) description: Staging server
paths: /users: get: summary: List users description: | Retrieve a paginated list of users with optional filtering.
Filtering Options
- Email: Exact match on email address
- Name: Partial match on user name
- Status: Filter by account status
- Date Range: Filter by creation date
Performance Notes
- Results are cached for 5 minutes
- Maximum 1000 results per request
- Use pagination for large datasets
Rate Limits
- Standard: 1000 requests/hour
- Premium: 10000 requests/hour
tags:
-
Users parameters:
-
name: email in: query description: | Filter users by email address (exact match).
Example:
user@example.com
schema: type: string format: email example: "john@example.com" -
name: name in: query description: | Filter users by name (partial match, case-insensitive).
Minimum 2 characters required. schema: type: string minLength: 2 maxLength: 100 example: "john"
-
name: limit in: query description: Number of users to return (max 1000) schema: type: integer minimum: 1 maximum: 1000 default: 50 example: 100
responses: '200': description: List of users retrieved successfully headers: X-Total-Count: description: Total number of users matching filters schema: type: integer example: 1250 X-Rate-Limit-Remaining: description: Requests remaining in current window schema: type: integer example: 995 content: application/json: schema: type: object properties: data: type: array items: ref: '#/components/schemas/User' meta: ref: '#/components/schemas/ResponseMeta' examples: successful_response: summary: Successful user list response description: Example response with multiple users value: data:
- id: "user_123" email: "john@example.com" name: "John Doe" status: "active" created_at: "2024-01-15T10:30:00Z" meta: timestamp: "2024-01-15T10:35:00Z" version: "2.1.0" total: 1250
empty_response: summary: Empty result set description: No users match the specified criteria value: data: [] meta: timestamp: "2024-01-15T10:35:00Z" version: "2.1.0" total: 0
'400': ref: '#/components/responses/BadRequest' '401': ref: '#/components/responses/Unauthorized' '429': ref: '#/components/responses/TooManyRequests'
components: schemas: User: type: object required:
-
id
-
email
-
name
-
status
-
created_at properties: id: type: string description: | Unique user identifier.
Format:
user_
followed by 20 alphanumeric characters pattern: '^user_[a-zA-Z0-9]20' example: "user_abc123def456ghij7890"
email: type: string format: email description: User's email address (unique across system) maxLength: 255 example: "john.doe@example.com"
name: type: string description: User's full name minLength: 1 maxLength: 100 example: "John Doe"
status: type: string enum: [active, suspended, pending_verification] description: | Current account status:
active
: User can access all featuressuspended
: Account temporarily disabledpending_verification
: Email verification required example: "active"
created_at: type: string format: date-time description: Account creation timestamp (ISO 8601) example: "2024-01-15T10:30:00Z"
last_login_at: type: string format: date-time nullable: true description: Last login timestamp (null if never logged in) example: "2024-01-16T14:22:00Z"
example: id: "user_abc123def456ghij7890" email: "john.doe@example.com" name: "John Doe" status: "active" created_at: "2024-01-15T10:30:00Z" last_login_at: "2024-01-16T14:22:00Z"
Following these best practices ensures APIs are secure, performant, maintainable, and provide excellent developer experience. Consistent application of these patterns across all APIs creates predictable interfaces that developers can easily understand and integrate with.