API Management
Authentication & Authorization

Authentication & Authorization

API security through authentication (verifying identity) and authorization (controlling access) is fundamental to protecting data and services. This guide covers modern approaches to securing APIs in data engineering environments.

Authentication Methods

API Keys

Simple authentication using unique keys for each client.

from functools import wraps
from flask import request, jsonify
import hashlib
import secrets
 
class APIKeyAuth:
    def __init__(self):
        # In production, store in database with proper hashing
        self.valid_keys = {
            'sk_test_abc123': {'client_id': 'client_1', 'permissions': ['read', 'write']},
            'sk_prod_xyz789': {'client_id': 'client_2', 'permissions': ['read']}
        }
    
    def generate_api_key(self, client_id: str) -> str:
        """Generate a secure API key for a client."""
        prefix = "sk_" + ("test_" if self.is_test_environment() else "prod_")
        random_part = secrets.token_urlsafe(32)
        return prefix + random_part
    
    def validate_key(self, api_key: str) -> dict:
        """Validate API key and return client info."""
        return self.valid_keys.get(api_key)
 
def require_api_key(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
        
        if not api_key:
            return jsonify({'error': 'API key required'}), 401
        
        auth = APIKeyAuth()
        client_info = auth.validate_key(api_key)
        
        if not client_info:
            return jsonify({'error': 'Invalid API key'}), 401
        
        # Add client info to request context
        request.client_id = client_info['client_id']
        request.permissions = client_info['permissions']
        
        return f(*args, **kwargs)
    
    return decorated_function
 
@app.route('/api/data')
@require_api_key
def get_data():
    if 'read' not in request.permissions:
        return jsonify({'error': 'Insufficient permissions'}), 403
    
    return jsonify({'data': 'sensitive information'})

JWT (JSON Web Tokens)

Stateless authentication using signed tokens containing claims.

import jwt
from datetime import datetime, timedelta
from functools import wraps
import os
 
class JWTAuth:
    def __init__(self):
        self.secret_key = os.getenv('JWT_SECRET_KEY', 'your-secret-key')
        self.algorithm = 'HS256'
        self.expiration_delta = timedelta(hours=24)
    
    def generate_token(self, user_id: str, permissions: list = None) -> str:
        """Generate JWT token for authenticated user."""
        payload = {
            'user_id': user_id,
            'permissions': permissions or [],
            'exp': datetime.utcnow() + self.expiration_delta,
            'iat': datetime.utcnow(),
            'iss': 'your-api-service'
        }
        
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
    
    def validate_token(self, token: str) -> dict:
        """Validate and decode JWT token."""
        try:
            payload = jwt.decode(
                token, 
                self.secret_key, 
                algorithms=[self.algorithm],
                options={'verify_exp': True}
            )
            return payload
        except jwt.ExpiredSignatureError:
            raise AuthenticationError('Token has expired')
        except jwt.InvalidTokenError:
            raise AuthenticationError('Invalid token')
 
def require_jwt_auth(required_permissions: list = None):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            auth_header = request.headers.get('Authorization')
            
            if not auth_header or not auth_header.startswith('Bearer '):
                return jsonify({'error': 'Bearer token required'}), 401
            
            token = auth_header.split(' ')[1]
            
            try:
                jwt_auth = JWTAuth()
                payload = jwt_auth.validate_token(token)
                
                # Check permissions if required
                if required_permissions:
                    user_permissions = set(payload.get('permissions', []))
                    required_perms = set(required_permissions)
                    
                    if not required_perms.issubset(user_permissions):
                        return jsonify({'error': 'Insufficient permissions'}), 403
                
                # Add user info to request context
                request.user_id = payload['user_id']
                request.permissions = payload['permissions']
                
                return f(*args, **kwargs)
                
            except AuthenticationError as e:
                return jsonify({'error': str(e)}), 401
        
        return decorated_function
    return decorator
 
@app.route('/api/admin/users')
@require_jwt_auth(required_permissions=['admin'])
def admin_get_users():
    return jsonify({'users': 'admin data'})

OAuth 2.0

Industry-standard authorization framework for secure API access.

from authlib.integrations.flask_oauth2 import ResourceProtector
from authlib.oauth2.rfc6749 import grants
from authlib.oauth2 import OAuth2Error
 
class OAuth2Handler:
    def __init__(self, app):
        self.app = app
        self.setup_oauth2_server()
    
    def setup_oauth2_server(self):
        """Configure OAuth2 server with grant types."""
        from authlib.integrations.flask_oauth2 import AuthorizationServer
        from your_models import OAuth2Client, OAuth2Token
        
        self.authorization_server = AuthorizationServer(self.app)
        
        # Configure supported grant types
        self.authorization_server.register_grant(grants.AuthorizationCodeGrant)
        self.authorization_server.register_grant(grants.RefreshTokenGrant)
        self.authorization_server.register_grant(grants.ClientCredentialsGrant)
        
        # Token introspection
        self.authorization_server.register_endpoint('introspection', TokenIntrospectionEndpoint)
 
# OAuth2 protected resource
require_oauth = ResourceProtector()
 
@require_oauth('read')
@app.route('/api/protected-data')
def protected_data():
    # Access token is automatically validated
    current_token = require_oauth.acquire_token()
    client_id = current_token.client_id
    scopes = current_token.get_scopes()
    
    return jsonify({
        'message': 'This is protected data',
        'client_id': client_id,
        'scopes': scopes
    })
 
# Token endpoint
@app.route('/oauth/token', methods=['POST'])
def issue_token():
    try:
        return authorization_server.create_token_response()
    except OAuth2Error as error:
        return jsonify(error.get_body()), error.status_code

Role-Based Access Control (RBAC)

Role Definition

from enum import Enum
from dataclasses import dataclass
from typing import List, Set
 
class Permission(Enum):
    READ_USERS = "users:read"
    WRITE_USERS = "users:write"
    DELETE_USERS = "users:delete"
    READ_DATA = "data:read"
    WRITE_DATA = "data:write"
    ADMIN_ACCESS = "admin:access"
    ANALYTICS_VIEW = "analytics:view"
 
@dataclass
class Role:
    name: str
    permissions: Set[Permission]
    description: str = ""
 
class RoleManager:
    def __init__(self):
        self.roles = {
            'viewer': Role(
                name='viewer',
                permissions={Permission.READ_USERS, Permission.READ_DATA, Permission.ANALYTICS_VIEW},
                description='Read-only access to users and data'
            ),
            'editor': Role(
                name='editor',
                permissions={
                    Permission.READ_USERS, Permission.WRITE_USERS,
                    Permission.READ_DATA, Permission.WRITE_DATA,
                    Permission.ANALYTICS_VIEW
                },
                description='Read and write access to users and data'
            ),
            'admin': Role(
                name='admin',
                permissions=set(Permission),  # All permissions
                description='Full administrative access'
            )
        }
    
    def get_role(self, role_name: str) -> Role:
        return self.roles.get(role_name)
    
    def user_has_permission(self, user_roles: List[str], required_permission: Permission) -> bool:
        """Check if user has required permission based on their roles."""
        user_permissions = set()
        
        for role_name in user_roles:
            role = self.get_role(role_name)
            if role:
                user_permissions.update(role.permissions)
        
        return required_permission in user_permissions
 
def require_permission(permission: Permission):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # Assume user roles are available in request context
            user_roles = getattr(request, 'user_roles', [])
            
            role_manager = RoleManager()
            if not role_manager.user_has_permission(user_roles, permission):
                return jsonify({'error': 'Insufficient permissions'}), 403
            
            return f(*args, **kwargs)
        
        return decorated_function
    return decorator
 
@app.route('/api/users', methods=['DELETE'])
@require_jwt_auth()
@require_permission(Permission.DELETE_USERS)
def delete_user():
    return jsonify({'message': 'User deleted'})

Security Best Practices

Rate Limiting

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import redis
 
# Redis-backed rate limiter
limiter = Limiter(
    app,
    key_func=get_remote_address,
    storage_uri="redis://localhost:6379"
)
 
# Custom rate limit based on API key
def get_api_key():
    return request.headers.get('X-API-Key', get_remote_address())
 
@app.route('/api/data')
@limiter.limit("100 per hour", key_func=get_api_key)
@require_api_key
def get_data():
    return jsonify({'data': 'rate limited data'})
 
# Different limits for different endpoints
@app.route('/api/expensive-operation')
@limiter.limit("10 per hour")
@require_jwt_auth(['admin'])
def expensive_operation():
    return jsonify({'result': 'expensive computation'})
 
# Custom rate limit handler
@app.errorhandler(429)
def ratelimit_handler(e):
    return jsonify({
        'error': 'Rate limit exceeded',
        'retry_after': e.retry_after
    }), 429

Input Validation & Sanitization

from marshmallow import Schema, fields, validate, ValidationError
from flask import request
import bleach
 
class CreateUserSchema(Schema):
    name = fields.Str(
        required=True,
        validate=validate.Length(min=1, max=100),
        missing=None
    )
    email = fields.Email(required=True)
    age = fields.Int(
        validate=validate.Range(min=0, max=150),
        missing=None
    )
    bio = fields.Str(
        validate=validate.Length(max=500),
        missing=None
    )
 
def validate_and_sanitize_input(schema_class):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            schema = schema_class()
            
            try:
                # Validate input data
                validated_data = schema.load(request.get_json())
                
                # Sanitize text fields
                if 'bio' in validated_data and validated_data['bio']:
                    validated_data['bio'] = bleach.clean(
                        validated_data['bio'],
                        tags=['p', 'br'],  # Allowed HTML tags
                        strip=True
                    )
                
                # Add validated data to request
                request.validated_data = validated_data
                
                return f(*args, **kwargs)
                
            except ValidationError as e:
                return jsonify({
                    'error': 'Validation failed',
                    'details': e.messages
                }), 400
        
        return decorated_function
    return decorator
 
@app.route('/api/users', methods=['POST'])
@require_jwt_auth()
@validate_and_sanitize_input(CreateUserSchema)
def create_user():
    validated_data = request.validated_data
    # Use validated and sanitized data
    user = user_service.create_user(validated_data)
    return jsonify(user.to_dict()), 201

CORS (Cross-Origin Resource Sharing)

from flask_cors import CORS
 
# Configure CORS for API
CORS(app, resources={
    r"/api/*": {
        "origins": ["https://dashboard.example.com", "https://app.example.com"],
        "methods": ["GET", "POST", "PUT", "DELETE"],
        "allow_headers": ["Content-Type", "Authorization", "X-API-Key"],
        "expose_headers": ["X-Total-Count", "X-Rate-Limit-Remaining"],
        "supports_credentials": True,
        "max_age": 3600
    }
})
 
# Custom CORS middleware for more control
from flask import after_request
 
@app.after_request
def after_request(response):
    origin = request.headers.get('Origin')
    
    # Check if origin is allowed
    allowed_origins = ['https://dashboard.example.com', 'https://app.example.com']
    
    if origin in allowed_origins:
        response.headers.add('Access-Control-Allow-Origin', origin)
        response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
        response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
        response.headers.add('Access-Control-Allow-Credentials', 'true')
    
    return response

Advanced Authentication Patterns

Multi-Factor Authentication (MFA)

import pyotp
import qrcode
from io import BytesIO
import base64
 
class MFAService:
    def __init__(self):
        self.issuer_name = "Your API Service"
    
    def generate_secret(self, user_id: str) -> str:
        """Generate TOTP secret for user."""
        return pyotp.random_base32()
    
    def generate_qr_code(self, user_id: str, secret: str) -> str:
        """Generate QR code for TOTP setup."""
        totp_auth = pyotp.totp.TOTP(secret)
        provisioning_uri = totp_auth.provisioning_uri(
            name=user_id,
            issuer_name=self.issuer_name
        )
        
        # Generate QR code
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(provisioning_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        
        # Convert to base64 string
        buffer = BytesIO()
        img.save(buffer, format='PNG')
        img_str = base64.b64encode(buffer.getvalue()).decode()
        
        return f"data:image/png;base64,{img_str}"
    
    def verify_token(self, secret: str, token: str) -> bool:
        """Verify TOTP token."""
        totp = pyotp.TOTP(secret)
        return totp.verify(token, valid_window=1)  # Allow 1 window tolerance
 
@app.route('/api/auth/mfa/setup', methods=['POST'])
@require_jwt_auth()
def setup_mfa():
    user_id = request.user_id
    
    mfa_service = MFAService()
    secret = mfa_service.generate_secret(user_id)
    qr_code = mfa_service.generate_qr_code(user_id, secret)
    
    # Store secret securely (encrypted in database)
    user_service.set_mfa_secret(user_id, secret)
    
    return jsonify({
        'secret': secret,
        'qr_code': qr_code
    })
 
@app.route('/api/auth/mfa/verify', methods=['POST'])
@require_jwt_auth()
def verify_mfa():
    user_id = request.user_id
    token = request.json.get('token')
    
    if not token:
        return jsonify({'error': 'TOTP token required'}), 400
    
    # Get user's MFA secret
    secret = user_service.get_mfa_secret(user_id)
    if not secret:
        return jsonify({'error': 'MFA not set up'}), 400
    
    mfa_service = MFAService()
    if mfa_service.verify_token(secret, token):
        # Mark session as MFA verified
        session['mfa_verified'] = True
        return jsonify({'message': 'MFA verified successfully'})
    else:
        return jsonify({'error': 'Invalid TOTP token'}), 401

Single Sign-On (SSO)

from authlib.integrations.flask_client import OAuth
 
class SSOHandler:
    def __init__(self, app):
        self.app = app
        self.oauth = OAuth(app)
        self.setup_providers()
    
    def setup_providers(self):
        # Configure Google SSO
        self.google = self.oauth.register(
            name='google',
            client_id=os.getenv('GOOGLE_CLIENT_ID'),
            client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
            server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
            client_kwargs={
                'scope': 'openid email profile'
            }
        )
        
        # Configure Microsoft Azure AD
        self.azure = self.oauth.register(
            name='azure',
            client_id=os.getenv('AZURE_CLIENT_ID'),
            client_secret=os.getenv('AZURE_CLIENT_SECRET'),
            tenant_id=os.getenv('AZURE_TENANT_ID'),
            authorize_url=f"https://login.microsoftonline.com/{os.getenv('AZURE_TENANT_ID')}/oauth2/v2.0/authorize",
            token_url=f"https://login.microsoftonline.com/{os.getenv('AZURE_TENANT_ID')}/oauth2/v2.0/token",
            client_kwargs={'scope': 'openid profile email'}
        )
 
@app.route('/auth/login/<provider>')
def login(provider):
    sso = SSOHandler(app)
    
    if provider == 'google':
        redirect_uri = url_for('auth_callback', provider='google', _external=True)
        return sso.google.authorize_redirect(redirect_uri)
    elif provider == 'azure':
        redirect_uri = url_for('auth_callback', provider='azure', _external=True)
        return sso.azure.authorize_redirect(redirect_uri)
    else:
        return jsonify({'error': 'Unsupported provider'}), 400
 
@app.route('/auth/callback/<provider>')
def auth_callback(provider):
    sso = SSOHandler(app)
    
    try:
        if provider == 'google':
            token = sso.google.authorize_access_token()
            user_info = token.get('userinfo')
        elif provider == 'azure':
            token = sso.azure.authorize_access_token()
            user_info = token.get('userinfo')
        else:
            return jsonify({'error': 'Invalid provider'}), 400
        
        # Create or update user in your system
        user = user_service.get_or_create_user_from_sso(
            email=user_info['email'],
            name=user_info['name'],
            provider=provider,
            external_id=user_info['sub']
        )
        
        # Generate JWT token for your API
        jwt_auth = JWTAuth()
        api_token = jwt_auth.generate_token(
            user_id=user.id,
            permissions=user.get_permissions()
        )
        
        # Redirect to frontend with token
        return redirect(f"https://your-frontend.com/auth/success?token={api_token}")
        
    except Exception as e:
        logger.error(f"SSO callback error: {str(e)}")
        return jsonify({'error': 'Authentication failed'}), 400

Security Monitoring

Audit Logging

import logging
import json
from datetime import datetime
from functools import wraps
 
class SecurityAuditLogger:
    def __init__(self):
        self.logger = logging.getLogger('security_audit')
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_event(self, event_type: str, user_id: str = None, **details):
        audit_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': event_type,
            'user_id': user_id,
            'ip_address': request.remote_addr,
            'user_agent': request.headers.get('User-Agent'),
            'details': details
        }
        
        self.logger.info(json.dumps(audit_entry))
 
def audit_api_access(event_type: str):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            audit_logger = SecurityAuditLogger()
            user_id = getattr(request, 'user_id', None)
            
            try:
                result = f(*args, **kwargs)
                
                # Log successful access
                audit_logger.log_event(
                    event_type=f"{event_type}_SUCCESS",
                    user_id=user_id,
                    endpoint=request.endpoint,
                    method=request.method
                )
                
                return result
                
            except Exception as e:
                # Log failed access
                audit_logger.log_event(
                    event_type=f"{event_type}_FAILURE",
                    user_id=user_id,
                    endpoint=request.endpoint,
                    method=request.method,
                    error=str(e)
                )
                raise
        
        return decorated_function
    return decorator
 
@app.route('/api/sensitive-data')
@require_jwt_auth()
@audit_api_access('SENSITIVE_DATA_ACCESS')
def get_sensitive_data():
    return jsonify({'data': 'highly sensitive information'})

Proper authentication and authorization are critical for API security. Implementing multiple layers of security, including strong authentication methods, role-based access control, rate limiting, and comprehensive audit logging, helps protect sensitive data and maintain system integrity in data engineering environments.