API Management
Monitoring & Analytics

Monitoring & Analytics

API monitoring and analytics provide crucial insights into system performance, usage patterns, and potential issues. Effective monitoring enables proactive maintenance and optimization of API services.

Key Metrics

Performance Metrics

  • Response Time: Average, median, 95th percentile
  • Throughput: Requests per second/minute
  • Error Rates: 4xx and 5xx error percentages
  • Availability: Uptime percentage

Business Metrics

  • API Usage: Requests per client/endpoint
  • Rate Limit Hits: Clients hitting rate limits
  • Authentication Failures: Failed auth attempts
  • Popular Endpoints: Most frequently used APIs

Monitoring Implementation

Custom Metrics Collection

from prometheus_client import Counter, Histogram, Gauge
from flask import request, g
import time
from functools import wraps
 
# Define Prometheus metrics
REQUEST_COUNT = Counter(
    'api_requests_total',
    'Total API requests',
    ['method', 'endpoint', 'status']
)
 
REQUEST_LATENCY = Histogram(
    'api_request_duration_seconds',
    'API request duration',
    ['method', 'endpoint']
)
 
ACTIVE_CONNECTIONS = Gauge(
    'api_active_connections',
    'Number of active connections'
)
 
ERROR_RATE = Counter(
    'api_errors_total',
    'Total API errors',
    ['endpoint', 'error_type']
)
 
def track_metrics(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        start_time = time.time()
        endpoint = request.endpoint or 'unknown'
        method = request.method
        
        try:
            response = f(*args, **kwargs)
            status_code = response.status_code if hasattr(response, 'status_code') else 200
            
            # Record metrics
            REQUEST_COUNT.labels(
                method=method,
                endpoint=endpoint,
                status=status_code
            ).inc()
            
            duration = time.time() - start_time
            REQUEST_LATENCY.labels(
                method=method,
                endpoint=endpoint
            ).observe(duration)
            
            return response
            
        except Exception as e:
            # Record error metrics
            ERROR_RATE.labels(
                endpoint=endpoint,
                error_type=type(e).__name__
            ).inc()
            
            REQUEST_COUNT.labels(
                method=method,
                endpoint=endpoint,
                status=500
            ).inc()
            
            raise
    
    return decorated_function
 
@app.route('/metrics')
def metrics():
    from prometheus_client import generate_latest
    return generate_latest()

Health Checks

import psutil
import redis
from sqlalchemy import text
 
class HealthChecker:
    def __init__(self, db, redis_client):
        self.db = db
        self.redis_client = redis_client
    
    def check_database(self):
        try:
            result = self.db.session.execute(text('SELECT 1')).scalar()
            return {'status': 'healthy', 'response_time': 0.001}
        except Exception as e:
            return {'status': 'unhealthy', 'error': str(e)}
    
    def check_redis(self):
        try:
            start = time.time()
            self.redis_client.ping()
            response_time = time.time() - start
            return {'status': 'healthy', 'response_time': response_time}
        except Exception as e:
            return {'status': 'unhealthy', 'error': str(e)}
    
    def check_disk_space(self):
        disk = psutil.disk_usage('/')
        free_percent = (disk.free / disk.total) * 100
        
        if free_percent < 10:
            status = 'critical'
        elif free_percent < 20:
            status = 'warning'
        else:
            status = 'healthy'
        
        return {
            'status': status,
            'free_space_percent': free_percent,
            'free_space_gb': disk.free // (1024**3)
        }
    
    def check_memory(self):
        memory = psutil.virtual_memory()
        
        if memory.percent > 90:
            status = 'critical'
        elif memory.percent > 80:
            status = 'warning'
        else:
            status = 'healthy'
        
        return {
            'status': status,
            'used_percent': memory.percent,
            'available_gb': memory.available // (1024**3)
        }
 
@app.route('/health')
def health_check():
    health_checker = HealthChecker(db, redis_client)
    
    checks = {
        'database': health_checker.check_database(),
        'redis': health_checker.check_redis(),
        'disk': health_checker.check_disk_space(),
        'memory': health_checker.check_memory()
    }
    
    # Determine overall status
    overall_status = 'healthy'
    for check in checks.values():
        if check['status'] == 'critical':
            overall_status = 'critical'
            break
        elif check['status'] == 'unhealthy':
            overall_status = 'unhealthy'
        elif check['status'] == 'warning' and overall_status == 'healthy':
            overall_status = 'warning'
    
    response_code = 200 if overall_status in ['healthy', 'warning'] else 503
    
    return jsonify({
        'status': overall_status,
        'timestamp': datetime.utcnow().isoformat(),
        'checks': checks
    }), response_code

Logging Strategies

Structured Logging

import logging
import json
from datetime import datetime
from flask import request, g
 
class APILogger:
    def __init__(self, app):
        self.app = app
        self.setup_logging()
    
    def setup_logging(self):
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # File handler for API logs
        file_handler = logging.FileHandler('api.log')
        file_handler.setFormatter(formatter)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        
        # Configure logger
        logger = logging.getLogger('api')
        logger.setLevel(logging.INFO)
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        
        self.logger = logger
    
    def log_request(self, response):
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'method': request.method,
            'url': request.url,
            'status_code': response.status_code,
            'response_time': getattr(g, 'request_start_time', 0),
            'user_agent': request.headers.get('User-Agent'),
            'ip_address': request.remote_addr,
            'user_id': getattr(request, 'user_id', None),
            'request_id': getattr(g, 'request_id', None)
        }
        
        # Log errors with more detail
        if response.status_code >= 400:
            log_data['error_details'] = {
                'request_data': request.get_json() if request.is_json else None,
                'headers': dict(request.headers)
            }
            
            self.logger.error(json.dumps(log_data))
        else:
            self.logger.info(json.dumps(log_data))

Request Tracing

import uuid
from flask import g, request
 
@app.before_request
def before_request():
    g.request_start_time = time.time()
    g.request_id = request.headers.get('X-Request-ID') or str(uuid.uuid4())
    
    # Add request ID to response headers
    @app.after_request
    def add_request_id(response):
        response.headers['X-Request-ID'] = g.request_id
        return response
 
@app.after_request
def log_request(response):
    duration = time.time() - g.request_start_time
    
    # Update metrics
    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=request.endpoint or 'unknown'
    ).observe(duration)
    
    # Log request
    api_logger = APILogger(app)
    api_logger.log_request(response)
    
    return response

Analytics Dashboard

Usage Analytics

from collections import defaultdict
from datetime import datetime, timedelta
import json
 
class UsageAnalytics:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def track_api_usage(self, endpoint, client_id, method='GET'):
        """Track API usage in Redis."""
        today = datetime.now().strftime('%Y-%m-%d')
        hour = datetime.now().strftime('%H')
        
        # Daily stats
        self.redis.hincrby(f'usage:daily:{today}', f'{endpoint}:{method}', 1)
        self.redis.hincrby(f'usage:daily:{today}:client', client_id, 1)
        
        # Hourly stats
        self.redis.hincrby(f'usage:hourly:{today}:{hour}', f'{endpoint}:{method}', 1)
        
        # Set expiration
        self.redis.expire(f'usage:daily:{today}', 86400 * 30)  # 30 days
        self.redis.expire(f'usage:hourly:{today}:{hour}', 86400 * 7)  # 7 days
    
    def get_usage_stats(self, days=7):
        """Get usage statistics for the last N days."""
        stats = {}
        
        for i in range(days):
            date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
            daily_stats = self.redis.hgetall(f'usage:daily:{date}')
            
            stats[date] = {
                key.decode(): int(value)
                for key, value in daily_stats.items()
            }
        
        return stats
 
@app.route('/api/analytics/usage')
@require_jwt_auth(['admin'])
def get_usage_analytics():
    days = request.args.get('days', 7, type=int)
    
    analytics = UsageAnalytics(redis_client)
    usage_stats = analytics.get_usage_stats(days)
    
    return jsonify({
        'period_days': days,
        'usage_by_date': usage_stats
    })

Alerting System

Alert Configuration

import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
 
class AlertManager:
    def __init__(self):
        self.smtp_server = os.getenv('SMTP_SERVER')
        self.smtp_port = int(os.getenv('SMTP_PORT', 587))
        self.smtp_username = os.getenv('SMTP_USERNAME')
        self.smtp_password = os.getenv('SMTP_PASSWORD')
        
        self.alert_recipients = [
            'ops-team@example.com',
            'api-team@example.com'
        ]
    
    def send_alert(self, subject, message, severity='INFO'):
        try:
            msg = MimeMultipart()
            msg['From'] = self.smtp_username
            msg['To'] = ', '.join(self.alert_recipients)
            msg['Subject'] = f'[{severity}] API Alert: {subject}'
            
            body = f"""
            Alert Details:
            - Severity: {severity}
            - Timestamp: {datetime.utcnow().isoformat()}
            - Message: {message}
            
            Please investigate and take appropriate action.
            """
            
            msg.attach(MimeText(body, 'plain'))
            
            server = smtplib.SMTP(self.smtp_server, self.smtp_port)
            server.starttls()
            server.login(self.smtp_username, self.smtp_password)
            server.send_message(msg)
            server.quit()
            
        except Exception as e:
            print(f"Failed to send alert: {str(e)}")
 
class MetricThresholds:
    def __init__(self, alert_manager):
        self.alert_manager = alert_manager
        self.thresholds = {
            'error_rate': {'warning': 5, 'critical': 10},  # percentage
            'response_time': {'warning': 1000, 'critical': 5000},  # milliseconds
            'requests_per_minute': {'warning': 1000, 'critical': 2000}
        }
    
    def check_error_rate(self, current_rate):
        if current_rate >= self.thresholds['error_rate']['critical']:
            self.alert_manager.send_alert(
                'High Error Rate',
                f'Error rate is {current_rate}% (threshold: {self.thresholds["error_rate"]["critical"]}%)',
                'CRITICAL'
            )
        elif current_rate >= self.thresholds['error_rate']['warning']:
            self.alert_manager.send_alert(
                'Elevated Error Rate',
                f'Error rate is {current_rate}% (threshold: {self.thresholds["error_rate"]["warning"]}%)',
                'WARNING'
            )
 
# Scheduled monitoring task
def run_monitoring_checks():
    """Run periodic monitoring checks."""
    alert_manager = AlertManager()
    thresholds = MetricThresholds(alert_manager)
    
    # Check error rates (implement your logic)
    current_error_rate = calculate_current_error_rate()
    thresholds.check_error_rate(current_error_rate)
    
    # Check response times
    avg_response_time = calculate_avg_response_time()
    if avg_response_time > thresholds.thresholds['response_time']['critical']:
        alert_manager.send_alert(
            'High Response Time',
            f'Average response time is {avg_response_time}ms',
            'CRITICAL'
        )

Integration with External Tools

Grafana Dashboard

# docker-compose.yml for monitoring stack
version: '3.8'
services:
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana-storage:/var/lib/grafana
 
# prometheus.yml
global:
  scrape_interval: 15s
 
scrape_configs:
  - job_name: 'api-service'
    static_configs:
      - targets: ['api:8000']
    metrics_path: '/metrics'
    scrape_interval: 5s

ELK Stack Integration

from elasticsearch import Elasticsearch
import logging
from pythonjsonlogger import jsonlogger
 
class ElasticsearchHandler(logging.Handler):
    def __init__(self, es_host='localhost', es_port=9200, index_name='api-logs'):
        super().__init__()
        self.es = Elasticsearch([{'host': es_host, 'port': es_port}])
        self.index_name = index_name
    
    def emit(self, record):
        try:
            log_entry = self.format(record)
            self.es.index(
                index=f"{self.index_name}-{datetime.now().strftime('%Y.%m.%d')}",
                body=json.loads(log_entry)
            )
        except Exception:
            self.handleError(record)
 
# Configure structured logging for Elasticsearch
def setup_elasticsearch_logging():
    logger = logging.getLogger('api')
    
    # JSON formatter
    json_formatter = jsonlogger.JsonFormatter(
        '%(asctime)s %(name)s %(levelname)s %(message)s'
    )
    
    # Elasticsearch handler
    es_handler = ElasticsearchHandler()
    es_handler.setFormatter(json_formatter)
    
    logger.addHandler(es_handler)
    return logger

Comprehensive monitoring and analytics are essential for maintaining high-quality API services. By implementing proper metrics collection, logging, alerting, and analytics, teams can proactively identify and resolve issues while gaining valuable insights into API usage and performance patterns.