Authentication & Authorization
API security through authentication (verifying identity) and authorization (controlling access) is fundamental to protecting data and services. This guide covers modern approaches to securing APIs in data engineering environments.
Authentication Methods
API Keys
Simple authentication using unique keys for each client.
from functools import wraps
from flask import request, jsonify
import hashlib
import secrets
class APIKeyAuth:
def __init__(self):
# In production, store in database with proper hashing
self.valid_keys = {
'sk_test_abc123': {'client_id': 'client_1', 'permissions': ['read', 'write']},
'sk_prod_xyz789': {'client_id': 'client_2', 'permissions': ['read']}
}
def generate_api_key(self, client_id: str) -> str:
"""Generate a secure API key for a client."""
prefix = "sk_" + ("test_" if self.is_test_environment() else "prod_")
random_part = secrets.token_urlsafe(32)
return prefix + random_part
def validate_key(self, api_key: str) -> dict:
"""Validate API key and return client info."""
return self.valid_keys.get(api_key)
def require_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
if not api_key:
return jsonify({'error': 'API key required'}), 401
auth = APIKeyAuth()
client_info = auth.validate_key(api_key)
if not client_info:
return jsonify({'error': 'Invalid API key'}), 401
# Add client info to request context
request.client_id = client_info['client_id']
request.permissions = client_info['permissions']
return f(*args, **kwargs)
return decorated_function
@app.route('/api/data')
@require_api_key
def get_data():
if 'read' not in request.permissions:
return jsonify({'error': 'Insufficient permissions'}), 403
return jsonify({'data': 'sensitive information'})
JWT (JSON Web Tokens)
Stateless authentication using signed tokens containing claims.
import jwt
from datetime import datetime, timedelta
from functools import wraps
import os
class JWTAuth:
def __init__(self):
self.secret_key = os.getenv('JWT_SECRET_KEY', 'your-secret-key')
self.algorithm = 'HS256'
self.expiration_delta = timedelta(hours=24)
def generate_token(self, user_id: str, permissions: list = None) -> str:
"""Generate JWT token for authenticated user."""
payload = {
'user_id': user_id,
'permissions': permissions or [],
'exp': datetime.utcnow() + self.expiration_delta,
'iat': datetime.utcnow(),
'iss': 'your-api-service'
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def validate_token(self, token: str) -> dict:
"""Validate and decode JWT token."""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={'verify_exp': True}
)
return payload
except jwt.ExpiredSignatureError:
raise AuthenticationError('Token has expired')
except jwt.InvalidTokenError:
raise AuthenticationError('Invalid token')
def require_jwt_auth(required_permissions: list = None):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Bearer token required'}), 401
token = auth_header.split(' ')[1]
try:
jwt_auth = JWTAuth()
payload = jwt_auth.validate_token(token)
# Check permissions if required
if required_permissions:
user_permissions = set(payload.get('permissions', []))
required_perms = set(required_permissions)
if not required_perms.issubset(user_permissions):
return jsonify({'error': 'Insufficient permissions'}), 403
# Add user info to request context
request.user_id = payload['user_id']
request.permissions = payload['permissions']
return f(*args, **kwargs)
except AuthenticationError as e:
return jsonify({'error': str(e)}), 401
return decorated_function
return decorator
@app.route('/api/admin/users')
@require_jwt_auth(required_permissions=['admin'])
def admin_get_users():
return jsonify({'users': 'admin data'})
OAuth 2.0
Industry-standard authorization framework for secure API access.
from authlib.integrations.flask_oauth2 import ResourceProtector
from authlib.oauth2.rfc6749 import grants
from authlib.oauth2 import OAuth2Error
class OAuth2Handler:
def __init__(self, app):
self.app = app
self.setup_oauth2_server()
def setup_oauth2_server(self):
"""Configure OAuth2 server with grant types."""
from authlib.integrations.flask_oauth2 import AuthorizationServer
from your_models import OAuth2Client, OAuth2Token
self.authorization_server = AuthorizationServer(self.app)
# Configure supported grant types
self.authorization_server.register_grant(grants.AuthorizationCodeGrant)
self.authorization_server.register_grant(grants.RefreshTokenGrant)
self.authorization_server.register_grant(grants.ClientCredentialsGrant)
# Token introspection
self.authorization_server.register_endpoint('introspection', TokenIntrospectionEndpoint)
# OAuth2 protected resource
require_oauth = ResourceProtector()
@require_oauth('read')
@app.route('/api/protected-data')
def protected_data():
# Access token is automatically validated
current_token = require_oauth.acquire_token()
client_id = current_token.client_id
scopes = current_token.get_scopes()
return jsonify({
'message': 'This is protected data',
'client_id': client_id,
'scopes': scopes
})
# Token endpoint
@app.route('/oauth/token', methods=['POST'])
def issue_token():
try:
return authorization_server.create_token_response()
except OAuth2Error as error:
return jsonify(error.get_body()), error.status_code
Role-Based Access Control (RBAC)
Role Definition
from enum import Enum
from dataclasses import dataclass
from typing import List, Set
class Permission(Enum):
READ_USERS = "users:read"
WRITE_USERS = "users:write"
DELETE_USERS = "users:delete"
READ_DATA = "data:read"
WRITE_DATA = "data:write"
ADMIN_ACCESS = "admin:access"
ANALYTICS_VIEW = "analytics:view"
@dataclass
class Role:
name: str
permissions: Set[Permission]
description: str = ""
class RoleManager:
def __init__(self):
self.roles = {
'viewer': Role(
name='viewer',
permissions={Permission.READ_USERS, Permission.READ_DATA, Permission.ANALYTICS_VIEW},
description='Read-only access to users and data'
),
'editor': Role(
name='editor',
permissions={
Permission.READ_USERS, Permission.WRITE_USERS,
Permission.READ_DATA, Permission.WRITE_DATA,
Permission.ANALYTICS_VIEW
},
description='Read and write access to users and data'
),
'admin': Role(
name='admin',
permissions=set(Permission), # All permissions
description='Full administrative access'
)
}
def get_role(self, role_name: str) -> Role:
return self.roles.get(role_name)
def user_has_permission(self, user_roles: List[str], required_permission: Permission) -> bool:
"""Check if user has required permission based on their roles."""
user_permissions = set()
for role_name in user_roles:
role = self.get_role(role_name)
if role:
user_permissions.update(role.permissions)
return required_permission in user_permissions
def require_permission(permission: Permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Assume user roles are available in request context
user_roles = getattr(request, 'user_roles', [])
role_manager = RoleManager()
if not role_manager.user_has_permission(user_roles, permission):
return jsonify({'error': 'Insufficient permissions'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/api/users', methods=['DELETE'])
@require_jwt_auth()
@require_permission(Permission.DELETE_USERS)
def delete_user():
return jsonify({'message': 'User deleted'})
Security Best Practices
Rate Limiting
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import redis
# Redis-backed rate limiter
limiter = Limiter(
app,
key_func=get_remote_address,
storage_uri="redis://localhost:6379"
)
# Custom rate limit based on API key
def get_api_key():
return request.headers.get('X-API-Key', get_remote_address())
@app.route('/api/data')
@limiter.limit("100 per hour", key_func=get_api_key)
@require_api_key
def get_data():
return jsonify({'data': 'rate limited data'})
# Different limits for different endpoints
@app.route('/api/expensive-operation')
@limiter.limit("10 per hour")
@require_jwt_auth(['admin'])
def expensive_operation():
return jsonify({'result': 'expensive computation'})
# Custom rate limit handler
@app.errorhandler(429)
def ratelimit_handler(e):
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': e.retry_after
}), 429
Input Validation & Sanitization
from marshmallow import Schema, fields, validate, ValidationError
from flask import request
import bleach
class CreateUserSchema(Schema):
name = fields.Str(
required=True,
validate=validate.Length(min=1, max=100),
missing=None
)
email = fields.Email(required=True)
age = fields.Int(
validate=validate.Range(min=0, max=150),
missing=None
)
bio = fields.Str(
validate=validate.Length(max=500),
missing=None
)
def validate_and_sanitize_input(schema_class):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
schema = schema_class()
try:
# Validate input data
validated_data = schema.load(request.get_json())
# Sanitize text fields
if 'bio' in validated_data and validated_data['bio']:
validated_data['bio'] = bleach.clean(
validated_data['bio'],
tags=['p', 'br'], # Allowed HTML tags
strip=True
)
# Add validated data to request
request.validated_data = validated_data
return f(*args, **kwargs)
except ValidationError as e:
return jsonify({
'error': 'Validation failed',
'details': e.messages
}), 400
return decorated_function
return decorator
@app.route('/api/users', methods=['POST'])
@require_jwt_auth()
@validate_and_sanitize_input(CreateUserSchema)
def create_user():
validated_data = request.validated_data
# Use validated and sanitized data
user = user_service.create_user(validated_data)
return jsonify(user.to_dict()), 201
CORS (Cross-Origin Resource Sharing)
from flask_cors import CORS
# Configure CORS for API
CORS(app, resources={
r"/api/*": {
"origins": ["https://dashboard.example.com", "https://app.example.com"],
"methods": ["GET", "POST", "PUT", "DELETE"],
"allow_headers": ["Content-Type", "Authorization", "X-API-Key"],
"expose_headers": ["X-Total-Count", "X-Rate-Limit-Remaining"],
"supports_credentials": True,
"max_age": 3600
}
})
# Custom CORS middleware for more control
from flask import after_request
@app.after_request
def after_request(response):
origin = request.headers.get('Origin')
# Check if origin is allowed
allowed_origins = ['https://dashboard.example.com', 'https://app.example.com']
if origin in allowed_origins:
response.headers.add('Access-Control-Allow-Origin', origin)
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
response.headers.add('Access-Control-Allow-Credentials', 'true')
return response
Advanced Authentication Patterns
Multi-Factor Authentication (MFA)
import pyotp
import qrcode
from io import BytesIO
import base64
class MFAService:
def __init__(self):
self.issuer_name = "Your API Service"
def generate_secret(self, user_id: str) -> str:
"""Generate TOTP secret for user."""
return pyotp.random_base32()
def generate_qr_code(self, user_id: str, secret: str) -> str:
"""Generate QR code for TOTP setup."""
totp_auth = pyotp.totp.TOTP(secret)
provisioning_uri = totp_auth.provisioning_uri(
name=user_id,
issuer_name=self.issuer_name
)
# Generate QR code
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# Convert to base64 string
buffer = BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{img_str}"
def verify_token(self, secret: str, token: str) -> bool:
"""Verify TOTP token."""
totp = pyotp.TOTP(secret)
return totp.verify(token, valid_window=1) # Allow 1 window tolerance
@app.route('/api/auth/mfa/setup', methods=['POST'])
@require_jwt_auth()
def setup_mfa():
user_id = request.user_id
mfa_service = MFAService()
secret = mfa_service.generate_secret(user_id)
qr_code = mfa_service.generate_qr_code(user_id, secret)
# Store secret securely (encrypted in database)
user_service.set_mfa_secret(user_id, secret)
return jsonify({
'secret': secret,
'qr_code': qr_code
})
@app.route('/api/auth/mfa/verify', methods=['POST'])
@require_jwt_auth()
def verify_mfa():
user_id = request.user_id
token = request.json.get('token')
if not token:
return jsonify({'error': 'TOTP token required'}), 400
# Get user's MFA secret
secret = user_service.get_mfa_secret(user_id)
if not secret:
return jsonify({'error': 'MFA not set up'}), 400
mfa_service = MFAService()
if mfa_service.verify_token(secret, token):
# Mark session as MFA verified
session['mfa_verified'] = True
return jsonify({'message': 'MFA verified successfully'})
else:
return jsonify({'error': 'Invalid TOTP token'}), 401
Single Sign-On (SSO)
from authlib.integrations.flask_client import OAuth
class SSOHandler:
def __init__(self, app):
self.app = app
self.oauth = OAuth(app)
self.setup_providers()
def setup_providers(self):
# Configure Google SSO
self.google = self.oauth.register(
name='google',
client_id=os.getenv('GOOGLE_CLIENT_ID'),
client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={
'scope': 'openid email profile'
}
)
# Configure Microsoft Azure AD
self.azure = self.oauth.register(
name='azure',
client_id=os.getenv('AZURE_CLIENT_ID'),
client_secret=os.getenv('AZURE_CLIENT_SECRET'),
tenant_id=os.getenv('AZURE_TENANT_ID'),
authorize_url=f"https://login.microsoftonline.com/{os.getenv('AZURE_TENANT_ID')}/oauth2/v2.0/authorize",
token_url=f"https://login.microsoftonline.com/{os.getenv('AZURE_TENANT_ID')}/oauth2/v2.0/token",
client_kwargs={'scope': 'openid profile email'}
)
@app.route('/auth/login/<provider>')
def login(provider):
sso = SSOHandler(app)
if provider == 'google':
redirect_uri = url_for('auth_callback', provider='google', _external=True)
return sso.google.authorize_redirect(redirect_uri)
elif provider == 'azure':
redirect_uri = url_for('auth_callback', provider='azure', _external=True)
return sso.azure.authorize_redirect(redirect_uri)
else:
return jsonify({'error': 'Unsupported provider'}), 400
@app.route('/auth/callback/<provider>')
def auth_callback(provider):
sso = SSOHandler(app)
try:
if provider == 'google':
token = sso.google.authorize_access_token()
user_info = token.get('userinfo')
elif provider == 'azure':
token = sso.azure.authorize_access_token()
user_info = token.get('userinfo')
else:
return jsonify({'error': 'Invalid provider'}), 400
# Create or update user in your system
user = user_service.get_or_create_user_from_sso(
email=user_info['email'],
name=user_info['name'],
provider=provider,
external_id=user_info['sub']
)
# Generate JWT token for your API
jwt_auth = JWTAuth()
api_token = jwt_auth.generate_token(
user_id=user.id,
permissions=user.get_permissions()
)
# Redirect to frontend with token
return redirect(f"https://your-frontend.com/auth/success?token={api_token}")
except Exception as e:
logger.error(f"SSO callback error: {str(e)}")
return jsonify({'error': 'Authentication failed'}), 400
Security Monitoring
Audit Logging
import logging
import json
from datetime import datetime
from functools import wraps
class SecurityAuditLogger:
def __init__(self):
self.logger = logging.getLogger('security_audit')
handler = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_event(self, event_type: str, user_id: str = None, **details):
audit_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'user_id': user_id,
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent'),
'details': details
}
self.logger.info(json.dumps(audit_entry))
def audit_api_access(event_type: str):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
audit_logger = SecurityAuditLogger()
user_id = getattr(request, 'user_id', None)
try:
result = f(*args, **kwargs)
# Log successful access
audit_logger.log_event(
event_type=f"{event_type}_SUCCESS",
user_id=user_id,
endpoint=request.endpoint,
method=request.method
)
return result
except Exception as e:
# Log failed access
audit_logger.log_event(
event_type=f"{event_type}_FAILURE",
user_id=user_id,
endpoint=request.endpoint,
method=request.method,
error=str(e)
)
raise
return decorated_function
return decorator
@app.route('/api/sensitive-data')
@require_jwt_auth()
@audit_api_access('SENSITIVE_DATA_ACCESS')
def get_sensitive_data():
return jsonify({'data': 'highly sensitive information'})
Proper authentication and authorization are critical for API security. Implementing multiple layers of security, including strong authentication methods, role-based access control, rate limiting, and comprehensive audit logging, helps protect sensitive data and maintain system integrity in data engineering environments.