Monitoring & Analytics
API monitoring and analytics provide crucial insights into system performance, usage patterns, and potential issues. Effective monitoring enables proactive maintenance and optimization of API services.
Key Metrics
Performance Metrics
- Response Time: Average, median, 95th percentile
- Throughput: Requests per second/minute
- Error Rates: 4xx and 5xx error percentages
- Availability: Uptime percentage
Business Metrics
- API Usage: Requests per client/endpoint
- Rate Limit Hits: Clients hitting rate limits
- Authentication Failures: Failed auth attempts
- Popular Endpoints: Most frequently used APIs
Monitoring Implementation
Custom Metrics Collection
from prometheus_client import Counter, Histogram, Gauge
from flask import request, g
import time
from functools import wraps
# Define Prometheus metrics
REQUEST_COUNT = Counter(
'api_requests_total',
'Total API requests',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'api_request_duration_seconds',
'API request duration',
['method', 'endpoint']
)
ACTIVE_CONNECTIONS = Gauge(
'api_active_connections',
'Number of active connections'
)
ERROR_RATE = Counter(
'api_errors_total',
'Total API errors',
['endpoint', 'error_type']
)
def track_metrics(f):
@wraps(f)
def decorated_function(*args, **kwargs):
start_time = time.time()
endpoint = request.endpoint or 'unknown'
method = request.method
try:
response = f(*args, **kwargs)
status_code = response.status_code if hasattr(response, 'status_code') else 200
# Record metrics
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=status_code
).inc()
duration = time.time() - start_time
REQUEST_LATENCY.labels(
method=method,
endpoint=endpoint
).observe(duration)
return response
except Exception as e:
# Record error metrics
ERROR_RATE.labels(
endpoint=endpoint,
error_type=type(e).__name__
).inc()
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=500
).inc()
raise
return decorated_function
@app.route('/metrics')
def metrics():
from prometheus_client import generate_latest
return generate_latest()
Health Checks
import psutil
import redis
from sqlalchemy import text
class HealthChecker:
def __init__(self, db, redis_client):
self.db = db
self.redis_client = redis_client
def check_database(self):
try:
result = self.db.session.execute(text('SELECT 1')).scalar()
return {'status': 'healthy', 'response_time': 0.001}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def check_redis(self):
try:
start = time.time()
self.redis_client.ping()
response_time = time.time() - start
return {'status': 'healthy', 'response_time': response_time}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def check_disk_space(self):
disk = psutil.disk_usage('/')
free_percent = (disk.free / disk.total) * 100
if free_percent < 10:
status = 'critical'
elif free_percent < 20:
status = 'warning'
else:
status = 'healthy'
return {
'status': status,
'free_space_percent': free_percent,
'free_space_gb': disk.free // (1024**3)
}
def check_memory(self):
memory = psutil.virtual_memory()
if memory.percent > 90:
status = 'critical'
elif memory.percent > 80:
status = 'warning'
else:
status = 'healthy'
return {
'status': status,
'used_percent': memory.percent,
'available_gb': memory.available // (1024**3)
}
@app.route('/health')
def health_check():
health_checker = HealthChecker(db, redis_client)
checks = {
'database': health_checker.check_database(),
'redis': health_checker.check_redis(),
'disk': health_checker.check_disk_space(),
'memory': health_checker.check_memory()
}
# Determine overall status
overall_status = 'healthy'
for check in checks.values():
if check['status'] == 'critical':
overall_status = 'critical'
break
elif check['status'] == 'unhealthy':
overall_status = 'unhealthy'
elif check['status'] == 'warning' and overall_status == 'healthy':
overall_status = 'warning'
response_code = 200 if overall_status in ['healthy', 'warning'] else 503
return jsonify({
'status': overall_status,
'timestamp': datetime.utcnow().isoformat(),
'checks': checks
}), response_code
Logging Strategies
Structured Logging
import logging
import json
from datetime import datetime
from flask import request, g
class APILogger:
def __init__(self, app):
self.app = app
self.setup_logging()
def setup_logging(self):
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# File handler for API logs
file_handler = logging.FileHandler('api.log')
file_handler.setFormatter(formatter)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# Configure logger
logger = logging.getLogger('api')
logger.setLevel(logging.INFO)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
self.logger = logger
def log_request(self, response):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'method': request.method,
'url': request.url,
'status_code': response.status_code,
'response_time': getattr(g, 'request_start_time', 0),
'user_agent': request.headers.get('User-Agent'),
'ip_address': request.remote_addr,
'user_id': getattr(request, 'user_id', None),
'request_id': getattr(g, 'request_id', None)
}
# Log errors with more detail
if response.status_code >= 400:
log_data['error_details'] = {
'request_data': request.get_json() if request.is_json else None,
'headers': dict(request.headers)
}
self.logger.error(json.dumps(log_data))
else:
self.logger.info(json.dumps(log_data))
Request Tracing
import uuid
from flask import g, request
@app.before_request
def before_request():
g.request_start_time = time.time()
g.request_id = request.headers.get('X-Request-ID') or str(uuid.uuid4())
# Add request ID to response headers
@app.after_request
def add_request_id(response):
response.headers['X-Request-ID'] = g.request_id
return response
@app.after_request
def log_request(response):
duration = time.time() - g.request_start_time
# Update metrics
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.endpoint or 'unknown'
).observe(duration)
# Log request
api_logger = APILogger(app)
api_logger.log_request(response)
return response
Analytics Dashboard
Usage Analytics
from collections import defaultdict
from datetime import datetime, timedelta
import json
class UsageAnalytics:
def __init__(self, redis_client):
self.redis = redis_client
def track_api_usage(self, endpoint, client_id, method='GET'):
"""Track API usage in Redis."""
today = datetime.now().strftime('%Y-%m-%d')
hour = datetime.now().strftime('%H')
# Daily stats
self.redis.hincrby(f'usage:daily:{today}', f'{endpoint}:{method}', 1)
self.redis.hincrby(f'usage:daily:{today}:client', client_id, 1)
# Hourly stats
self.redis.hincrby(f'usage:hourly:{today}:{hour}', f'{endpoint}:{method}', 1)
# Set expiration
self.redis.expire(f'usage:daily:{today}', 86400 * 30) # 30 days
self.redis.expire(f'usage:hourly:{today}:{hour}', 86400 * 7) # 7 days
def get_usage_stats(self, days=7):
"""Get usage statistics for the last N days."""
stats = {}
for i in range(days):
date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
daily_stats = self.redis.hgetall(f'usage:daily:{date}')
stats[date] = {
key.decode(): int(value)
for key, value in daily_stats.items()
}
return stats
@app.route('/api/analytics/usage')
@require_jwt_auth(['admin'])
def get_usage_analytics():
days = request.args.get('days', 7, type=int)
analytics = UsageAnalytics(redis_client)
usage_stats = analytics.get_usage_stats(days)
return jsonify({
'period_days': days,
'usage_by_date': usage_stats
})
Alerting System
Alert Configuration
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
class AlertManager:
def __init__(self):
self.smtp_server = os.getenv('SMTP_SERVER')
self.smtp_port = int(os.getenv('SMTP_PORT', 587))
self.smtp_username = os.getenv('SMTP_USERNAME')
self.smtp_password = os.getenv('SMTP_PASSWORD')
self.alert_recipients = [
'ops-team@example.com',
'api-team@example.com'
]
def send_alert(self, subject, message, severity='INFO'):
try:
msg = MimeMultipart()
msg['From'] = self.smtp_username
msg['To'] = ', '.join(self.alert_recipients)
msg['Subject'] = f'[{severity}] API Alert: {subject}'
body = f"""
Alert Details:
- Severity: {severity}
- Timestamp: {datetime.utcnow().isoformat()}
- Message: {message}
Please investigate and take appropriate action.
"""
msg.attach(MimeText(body, 'plain'))
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
server.starttls()
server.login(self.smtp_username, self.smtp_password)
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Failed to send alert: {str(e)}")
class MetricThresholds:
def __init__(self, alert_manager):
self.alert_manager = alert_manager
self.thresholds = {
'error_rate': {'warning': 5, 'critical': 10}, # percentage
'response_time': {'warning': 1000, 'critical': 5000}, # milliseconds
'requests_per_minute': {'warning': 1000, 'critical': 2000}
}
def check_error_rate(self, current_rate):
if current_rate >= self.thresholds['error_rate']['critical']:
self.alert_manager.send_alert(
'High Error Rate',
f'Error rate is {current_rate}% (threshold: {self.thresholds["error_rate"]["critical"]}%)',
'CRITICAL'
)
elif current_rate >= self.thresholds['error_rate']['warning']:
self.alert_manager.send_alert(
'Elevated Error Rate',
f'Error rate is {current_rate}% (threshold: {self.thresholds["error_rate"]["warning"]}%)',
'WARNING'
)
# Scheduled monitoring task
def run_monitoring_checks():
"""Run periodic monitoring checks."""
alert_manager = AlertManager()
thresholds = MetricThresholds(alert_manager)
# Check error rates (implement your logic)
current_error_rate = calculate_current_error_rate()
thresholds.check_error_rate(current_error_rate)
# Check response times
avg_response_time = calculate_avg_response_time()
if avg_response_time > thresholds.thresholds['response_time']['critical']:
alert_manager.send_alert(
'High Response Time',
f'Average response time is {avg_response_time}ms',
'CRITICAL'
)
Integration with External Tools
Grafana Dashboard
# docker-compose.yml for monitoring stack
version: '3.8'
services:
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-storage:/var/lib/grafana
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'api-service'
static_configs:
- targets: ['api:8000']
metrics_path: '/metrics'
scrape_interval: 5s
ELK Stack Integration
from elasticsearch import Elasticsearch
import logging
from pythonjsonlogger import jsonlogger
class ElasticsearchHandler(logging.Handler):
def __init__(self, es_host='localhost', es_port=9200, index_name='api-logs'):
super().__init__()
self.es = Elasticsearch([{'host': es_host, 'port': es_port}])
self.index_name = index_name
def emit(self, record):
try:
log_entry = self.format(record)
self.es.index(
index=f"{self.index_name}-{datetime.now().strftime('%Y.%m.%d')}",
body=json.loads(log_entry)
)
except Exception:
self.handleError(record)
# Configure structured logging for Elasticsearch
def setup_elasticsearch_logging():
logger = logging.getLogger('api')
# JSON formatter
json_formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
# Elasticsearch handler
es_handler = ElasticsearchHandler()
es_handler.setFormatter(json_formatter)
logger.addHandler(es_handler)
return logger
Comprehensive monitoring and analytics are essential for maintaining high-quality API services. By implementing proper metrics collection, logging, alerting, and analytics, teams can proactively identify and resolve issues while gaining valuable insights into API usage and performance patterns.