Machine Learning
Unsupervised Learning
Anomaly Detection

Anomaly Detection

Anomaly detection identifies unusual patterns, outliers, or anomalies in data that deviate significantly from expected behavior. It's crucial for fraud detection, network security, quality control, and monitoring systems.

Types of Anomalies

Point Anomalies

Individual data instances that are anomalous with respect to the rest of the data.

import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs
 
# Generate data with point anomalies
np.random.seed(42)
X_normal, _ = make_blobs(n_samples=300, centers=1, cluster_std=1.0)
X_anomalies = np.random.uniform(low=-6, high=6, size=(20, 2))
X = np.vstack([X_normal, X_anomalies])
 
plt.figure(figsize=(10, 6))
plt.scatter(X_normal[:, 0], X_normal[:, 1], alpha=0.6, label='Normal')
plt.scatter(X_anomalies[:, 0], X_anomalies[:, 1], c='red', alpha=0.8, label='Anomalies')
plt.title('Point Anomalies Example')
plt.legend()
plt.show()

Contextual Anomalies

Data points that are anomalous in a specific context but normal otherwise.

# Time series with contextual anomalies
import pandas as pd
from datetime import datetime, timedelta
 
# Generate seasonal time series
dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
seasonal_pattern = 10 + 5 * np.sin(2 * np.pi * np.arange(len(dates)) / 365.25)
noise = np.random.normal(0, 0.5, len(dates))
ts_normal = seasonal_pattern + noise
 
# Add contextual anomalies (high values in winter, low values in summer)
ts_anomalous = ts_normal.copy()
ts_anomalous[60] = 20  # Anomalously high in winter
ts_anomalous[180] = 2  # Anomalously low in summer
 
plt.figure(figsize=(12, 6))
plt.plot(dates, ts_normal, alpha=0.7, label='Normal Pattern')
plt.plot(dates, ts_anomalous, 'r-', alpha=0.8, label='With Contextual Anomalies')
plt.scatter(dates[[60, 180]], ts_anomalous[[60, 180]], 
           c='red', s=100, zorder=5, label='Anomalies')
plt.title('Contextual Anomalies in Time Series')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.xticks(rotation=45)
plt.show()

Collective Anomalies

Collections of data points that are anomalous when considered together.

# Network traffic with collective anomaly (DDoS attack pattern)
np.random.seed(42)
normal_traffic = np.random.poisson(lam=50, size=200)
# Sudden spike representing collective anomaly
attack_period = np.random.poisson(lam=500, size=20)
traffic = np.concatenate([
    normal_traffic[:100],
    attack_period,
    normal_traffic[100:]
])
 
plt.figure(figsize=(12, 6))
plt.plot(traffic, alpha=0.8)
plt.axvspan(100, 120, alpha=0.3, color='red', label='Collective Anomaly')
plt.title('Collective Anomaly in Network Traffic')
plt.xlabel('Time')
plt.ylabel('Requests per minute')
plt.legend()
plt.show()

Statistical Methods

Z-Score Based Detection

from scipy import stats
 
class ZScoreAnomalyDetector:
    def __init__(self, threshold=3):
        self.threshold = threshold
        self.mean_ = None
        self.std_ = None
    
    def fit(self, X):
        self.mean_ = np.mean(X)
        self.std_ = np.std(X)
        return self
    
    def predict(self, X):
        z_scores = np.abs((X - self.mean_) / self.std_)
        return (z_scores > self.threshold).astype(int)
    
    def decision_function(self, X):
        return np.abs((X - self.mean_) / self.std_)
 
# Example with synthetic data
np.random.seed(42)
data = np.random.normal(0, 1, 1000)
# Add some anomalies
data[50] = 5
data[150] = -4.5
data[300] = 6
 
detector = ZScoreAnomalyDetector(threshold=3)
detector.fit(data)
predictions = detector.predict(data)
 
plt.figure(figsize=(12, 6))
plt.plot(data, alpha=0.7, label='Data')
plt.scatter(np.where(predictions == 1)[0], data[predictions == 1], 
           c='red', s=50, label='Anomalies')
plt.axhline(y=detector.mean_ + 3*detector.std_, color='r', linestyle='--', alpha=0.5)
plt.axhline(y=detector.mean_ - 3*detector.std_, color='r', linestyle='--', alpha=0.5)
plt.title('Z-Score Based Anomaly Detection')
plt.legend()
plt.show()
 
print(f"Detected {np.sum(predictions)} anomalies")

Modified Z-Score (Robust)

def modified_z_score(data):
    """
    Calculate modified Z-score using median and MAD (Median Absolute Deviation)
    More robust to outliers than standard Z-score
    """
    median = np.median(data)
    mad = np.median(np.abs(data - median))
    modified_z_scores = 0.6745 * (data - median) / mad
    return modified_z_scores
 
# Compare standard vs modified Z-score
data_with_outliers = np.concatenate([
    np.random.normal(0, 1, 950),
    np.random.normal(10, 0.5, 50)  # Outlier cluster
])
 
z_scores = np.abs((data_with_outliers - np.mean(data_with_outliers)) / np.std(data_with_outliers))
modified_z_scores = np.abs(modified_z_score(data_with_outliers))
 
plt.figure(figsize=(15, 5))
 
plt.subplot(1, 3, 1)
plt.hist(data_with_outliers, bins=50, alpha=0.7)
plt.title('Data Distribution')
 
plt.subplot(1, 3, 2)
plt.scatter(range(len(data_with_outliers)), z_scores, alpha=0.6)
plt.axhline(y=3, color='r', linestyle='--', label='Threshold')
plt.title('Standard Z-Score')
plt.ylabel('Z-Score')
plt.legend()
 
plt.subplot(1, 3, 3)
plt.scatter(range(len(data_with_outliers)), modified_z_scores, alpha=0.6)
plt.axhline(y=3.5, color='r', linestyle='--', label='Threshold')
plt.title('Modified Z-Score')
plt.ylabel('Modified Z-Score')
plt.legend()
 
plt.tight_layout()
plt.show()
 
# Count anomalies detected by each method
standard_anomalies = np.sum(z_scores > 3)
modified_anomalies = np.sum(modified_z_scores > 3.5)
 
print(f"Standard Z-Score anomalies: {standard_anomalies}")
print(f"Modified Z-Score anomalies: {modified_anomalies}")

Machine Learning Approaches

Isolation Forest

from sklearn.ensemble import IsolationForest
from sklearn.datasets import make_classification
 
# Generate dataset with anomalies
X, _ = make_classification(
    n_samples=1000, n_features=2, n_informative=2, 
    n_redundant=0, n_clusters_per_class=1, random_state=42
)
 
# Add anomalies
anomalies = np.random.uniform(low=-8, high=8, size=(50, 2))
X_with_anomalies = np.vstack([X, anomalies])
y_true = np.concatenate([np.zeros(len(X)), np.ones(len(anomalies))])
 
# Isolation Forest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
y_pred = iso_forest.fit_predict(X_with_anomalies)
y_pred = np.where(y_pred == -1, 1, 0)  # Convert to 0/1
 
# Visualization
plt.figure(figsize=(15, 5))
 
plt.subplot(1, 3, 1)
plt.scatter(X[:, 0], X[:, 1], alpha=0.6, label='Normal')
plt.scatter(anomalies[:, 0], anomalies[:, 1], c='red', alpha=0.8, label='True Anomalies')
plt.title('Ground Truth')
plt.legend()
 
plt.subplot(1, 3, 2)
normal_pred = X_with_anomalies[y_pred == 0]
anomaly_pred = X_with_anomalies[y_pred == 1]
plt.scatter(normal_pred[:, 0], normal_pred[:, 1], alpha=0.6, label='Predicted Normal')
plt.scatter(anomaly_pred[:, 0], anomaly_pred[:, 1], c='red', alpha=0.8, label='Predicted Anomalies')
plt.title('Isolation Forest Predictions')
plt.legend()
 
plt.subplot(1, 3, 3)
# Decision boundary
xx, yy = np.meshgrid(np.linspace(-8, 8, 100), np.linspace(-8, 8, 100))
Z = iso_forest.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
plt.scatter(X_with_anomalies[:, 0], X_with_anomalies[:, 1], c=y_true, cmap='viridis', alpha=0.6)
plt.title('Decision Boundary')
 
plt.tight_layout()
plt.show()
 
# Evaluation
from sklearn.metrics import classification_report, confusion_matrix
 
print("Classification Report:")
print(classification_report(y_true, y_pred, target_names=['Normal', 'Anomaly']))
print("\nConfusion Matrix:")
print(confusion_matrix(y_true, y_pred))

One-Class SVM

from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
 
# Prepare data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_with_anomalies)
 
# One-Class SVM
oc_svm = OneClassSVM(kernel='rbf', gamma='scale', nu=0.1)
y_pred_svm = oc_svm.fit_predict(X_scaled)
y_pred_svm = np.where(y_pred_svm == -1, 1, 0)
 
# Local Outlier Factor
from sklearn.neighbors import LocalOutlierFactor
 
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
y_pred_lof = lof.fit_predict(X_scaled)
y_pred_lof = np.where(y_pred_lof == -1, 1, 0)
 
# Compare methods
methods = {
    'Isolation Forest': y_pred,
    'One-Class SVM': y_pred_svm,
    'LOF': y_pred_lof
}
 
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
 
for i, (method, predictions) in enumerate(methods.items()):
    normal_pred = X_with_anomalies[predictions == 0]
    anomaly_pred = X_with_anomalies[predictions == 1]
    
    axes[i].scatter(normal_pred[:, 0], normal_pred[:, 1], alpha=0.6, label='Normal')
    axes[i].scatter(anomaly_pred[:, 0], anomaly_pred[:, 1], c='red', alpha=0.8, label='Anomalies')
    axes[i].set_title(f'{method}')
    axes[i].legend()
 
plt.tight_layout()
plt.show()
 
# Performance comparison
for method, predictions in methods.items():
    precision = np.sum((predictions == 1) & (y_true == 1)) / np.sum(predictions == 1)
    recall = np.sum((predictions == 1) & (y_true == 1)) / np.sum(y_true == 1)
    f1 = 2 * precision * recall / (precision + recall)
    print(f"{method:15} - Precision: {precision:.3f}, Recall: {recall:.3f}, F1: {f1:.3f}")

Autoencoders for Anomaly Detection

import tensorflow as tf
from tensorflow.keras import layers, Model
 
class AnomalyAutoencoder(Model):
    def __init__(self, input_dim, encoding_dim=32):
        super(AnomalyAutoencoder, self).__init__()
        self.input_dim = input_dim
        self.encoding_dim = encoding_dim
        
        # Encoder
        self.encoder = tf.keras.Sequential([
            layers.Dense(64, activation='relu'),
            layers.Dense(32, activation='relu'),
            layers.Dense(encoding_dim, activation='relu')
        ])
        
        # Decoder
        self.decoder = tf.keras.Sequential([
            layers.Dense(32, activation='relu'),
            layers.Dense(64, activation='relu'),
            layers.Dense(input_dim, activation='linear')
        ])
    
    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded
 
# Generate normal training data (without anomalies)
from sklearn.datasets import make_blobs
 
X_train, _ = make_blobs(n_samples=1000, centers=3, cluster_std=1.5, random_state=42)
 
# Normalize data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
 
# Create and train autoencoder
autoencoder = AnomalyAutoencoder(input_dim=X_train_scaled.shape[1], encoding_dim=8)
autoencoder.compile(optimizer='adam', loss='mse')
 
history = autoencoder.fit(
    X_train_scaled, X_train_scaled,
    epochs=100,
    batch_size=32,
    validation_split=0.2,
    verbose=0
)
 
# Test on data with anomalies
X_test = np.vstack([X_train, anomalies])
X_test_scaled = scaler.transform(X_test)
 
# Calculate reconstruction error
X_pred = autoencoder.predict(X_test_scaled)
reconstruction_error = tf.keras.losses.mse(X_test_scaled, X_pred)
 
# Set threshold based on training data reconstruction error
train_pred = autoencoder.predict(X_train_scaled)
train_error = tf.keras.losses.mse(X_train_scaled, train_pred)
threshold = np.percentile(train_error, 95)  # 95th percentile
 
# Predict anomalies
y_pred_ae = (reconstruction_error > threshold).numpy().astype(int)
 
# Visualization
plt.figure(figsize=(15, 5))
 
plt.subplot(1, 3, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
 
plt.subplot(1, 3, 2)
plt.scatter(range(len(reconstruction_error)), reconstruction_error, alpha=0.6)
plt.axhline(y=threshold, color='r', linestyle='--', label=f'Threshold ({threshold:.3f})')
plt.title('Reconstruction Error')
plt.xlabel('Sample Index')
plt.ylabel('MSE')
plt.legend()
 
plt.subplot(1, 3, 3)
normal_ae = X_test[y_pred_ae == 0]
anomaly_ae = X_test[y_pred_ae == 1]
plt.scatter(normal_ae[:, 0], normal_ae[:, 1], alpha=0.6, label='Normal')
plt.scatter(anomaly_ae[:, 0], anomaly_ae[:, 1], c='red', alpha=0.8, label='Anomalies')
plt.title('Autoencoder Anomaly Detection')
plt.legend()
 
plt.tight_layout()
plt.show()
 
print(f"Autoencoder detected {np.sum(y_pred_ae)} anomalies")

Time Series Anomaly Detection

STL Decomposition + Statistical Detection

from statsmodels.tsa.seasonal import STL
import pandas as pd
 
def detect_time_series_anomalies(ts, seasonal_periods=12, threshold=3):
    """
    Detect anomalies in time series using STL decomposition
    """
    # STL decomposition
    stl = STL(ts, seasonal=seasonal_periods, robust=True)
    decomposition = stl.fit()
    
    # Use residuals for anomaly detection
    residuals = decomposition.resid
    
    # Modified Z-score on residuals
    median_resid = np.median(residuals)
    mad_resid = np.median(np.abs(residuals - median_resid))
    modified_z_scores = 0.6745 * (residuals - median_resid) / mad_resid
    
    anomalies = np.abs(modified_z_scores) > threshold
    
    return anomalies, decomposition
 
# Generate time series with seasonal pattern and anomalies
np.random.seed(42)
dates = pd.date_range('2020-01-01', periods=365*2, freq='D')
seasonal = 10 * np.sin(2 * np.pi * np.arange(len(dates)) / 365)
trend = 0.01 * np.arange(len(dates))
noise = np.random.normal(0, 1, len(dates))
ts = seasonal + trend + noise
 
# Add anomalies
ts[100] += 15  # Spike
ts[300:310] -= 10  # Drop period
ts[500] += 20  # Another spike
 
ts = pd.Series(ts, index=dates)
 
# Detect anomalies
anomalies, decomposition = detect_time_series_anomalies(ts, seasonal_periods=365)
 
# Plot results
fig, axes = plt.subplots(4, 1, figsize=(15, 12))
 
# Original time series with anomalies
axes[0].plot(ts.index, ts.values, alpha=0.8, label='Time Series')
axes[0].scatter(ts.index[anomalies], ts.values[anomalies], 
               c='red', s=50, label='Anomalies', zorder=5)
axes[0].set_title('Original Time Series')
axes[0].legend()
 
# Trend
axes[1].plot(ts.index, decomposition.trend, color='orange', label='Trend')
axes[1].set_title('Trend Component')
 
# Seasonal
axes[2].plot(ts.index, decomposition.seasonal, color='green', label='Seasonal')
axes[2].set_title('Seasonal Component')
 
# Residuals
axes[3].plot(ts.index, decomposition.resid, alpha=0.8, label='Residuals')
axes[3].scatter(ts.index[anomalies], decomposition.resid[anomalies], 
               c='red', s=50, label='Anomalies', zorder=5)
axes[3].set_title('Residuals')
axes[3].legend()
 
plt.tight_layout()
plt.show()
 
print(f"Detected {np.sum(anomalies)} anomalies")

LSTM Autoencoder for Time Series

def create_sequences(data, seq_length):
    """Create sequences for LSTM input"""
    sequences = []
    for i in range(len(data) - seq_length):
        sequences.append(data[i:i+seq_length])
    return np.array(sequences)
 
class LSTMAnomalyDetector:
    def __init__(self, sequence_length=30, encoding_dim=50):
        self.sequence_length = sequence_length
        self.encoding_dim = encoding_dim
        self.model = None
        self.scaler = StandardScaler()
    
    def build_model(self, input_shape):
        # Encoder
        encoder_input = layers.Input(shape=input_shape)
        encoder = layers.LSTM(self.encoding_dim, return_sequences=False)(encoder_input)
        encoder = layers.RepeatVector(input_shape[0])(encoder)
        
        # Decoder
        decoder = layers.LSTM(self.encoding_dim, return_sequences=True)(encoder)
        decoder_output = layers.TimeDistributed(layers.Dense(input_shape[1]))(decoder)
        
        self.model = Model(encoder_input, decoder_output)
        self.model.compile(optimizer='adam', loss='mse')
    
    def fit(self, X, epochs=50, batch_size=32):
        # Normalize data
        X_scaled = self.scaler.fit_transform(X.reshape(-1, 1)).reshape(X.shape)
        
        # Create sequences
        sequences = create_sequences(X_scaled, self.sequence_length)
        
        # Build model if not exists
        if self.model is None:
            self.build_model((self.sequence_length, 1))
        
        # Train
        history = self.model.fit(
            sequences, sequences,
            epochs=epochs,
            batch_size=batch_size,
            validation_split=0.2,
            verbose=0
        )
        
        return history
    
    def predict_anomalies(self, X, threshold_percentile=95):
        # Normalize
        X_scaled = self.scaler.transform(X.reshape(-1, 1)).reshape(X.shape)
        
        # Create sequences
        sequences = create_sequences(X_scaled, self.sequence_length)
        
        # Predict
        predictions = self.model.predict(sequences)
        
        # Calculate reconstruction error
        mse = np.mean(np.power(sequences - predictions, 2), axis=(1, 2))
        
        # Set threshold
        threshold = np.percentile(mse, threshold_percentile)
        
        # Detect anomalies
        anomalies = mse > threshold
        
        # Extend to original length (pad beginning with False)
        anomaly_flags = np.zeros(len(X), dtype=bool)
        anomaly_flags[self.sequence_length:] = anomalies
        
        return anomaly_flags, mse, threshold
 
# Generate synthetic time series
np.random.seed(42)
t = np.linspace(0, 100, 2000)
ts_normal = np.sin(0.1 * t) + 0.5 * np.sin(0.3 * t) + np.random.normal(0, 0.1, len(t))
 
# Add anomalies
ts_with_anomalies = ts_normal.copy()
ts_with_anomalies[500:520] += 2  # Anomaly period
ts_with_anomalies[1200] -= 3    # Point anomaly
ts_with_anomalies[1600:1650] += 1.5  # Another anomaly period
 
# Train LSTM detector
detector = LSTMAnomalyDetector(sequence_length=50, encoding_dim=32)
history = detector.fit(ts_normal, epochs=50)  # Train only on normal data
 
# Detect anomalies in test data
anomalies, reconstruction_errors, threshold = detector.predict_anomalies(ts_with_anomalies)
 
# Visualization
plt.figure(figsize=(15, 8))
 
plt.subplot(2, 1, 1)
plt.plot(t, ts_with_anomalies, alpha=0.8, label='Time Series')
plt.scatter(t[anomalies], ts_with_anomalies[anomalies], 
           c='red', s=30, label='Detected Anomalies', zorder=5)
plt.title('LSTM Autoencoder Anomaly Detection')
plt.legend()
 
plt.subplot(2, 1, 2)
plt.plot(t[detector.sequence_length:], reconstruction_errors, alpha=0.8, label='Reconstruction Error')
plt.axhline(y=threshold, color='r', linestyle='--', label=f'Threshold')
plt.title('Reconstruction Error')
plt.xlabel('Time')
plt.ylabel('MSE')
plt.legend()
 
plt.tight_layout()
plt.show()
 
print(f"LSTM Autoencoder detected {np.sum(anomalies)} anomalies")

Evaluation Metrics

from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, average_precision_score
 
def evaluate_anomaly_detection(y_true, y_pred, y_scores=None):
    """
    Comprehensive evaluation of anomaly detection performance
    """
    # Basic metrics
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    
    results = {
        'Precision': precision,
        'Recall': recall,
        'F1-Score': f1,
        'Accuracy': (y_true == y_pred).mean()
    }
    
    # If probability scores available
    if y_scores is not None:
        auc_roc = roc_auc_score(y_true, y_scores)
        auc_pr = average_precision_score(y_true, y_scores)
        results['AUC-ROC'] = auc_roc
        results['AUC-PR'] = auc_pr
    
    return results
 
# Create synthetic evaluation scenario
np.random.seed(42)
n_normal = 950
n_anomalies = 50
total_samples = n_normal + n_anomalies
 
# Generate ground truth
y_true = np.concatenate([np.zeros(n_normal), np.ones(n_anomalies)])
 
# Simulate different detector performances
detectors = {
    'Perfect Detector': {
        'predictions': y_true,
        'scores': np.concatenate([np.random.beta(2, 5, n_normal), 
                                np.random.beta(5, 2, n_anomalies)])
    },
    'Good Detector': {
        'predictions': np.concatenate([
            np.random.choice([0, 1], n_normal, p=[0.95, 0.05]),
            np.random.choice([0, 1], n_anomalies, p=[0.15, 0.85])
        ]),
        'scores': np.concatenate([np.random.beta(2, 8, n_normal), 
                                np.random.beta(8, 2, n_anomalies)])
    },
    'Poor Detector': {
        'predictions': np.random.choice([0, 1], total_samples, p=[0.9, 0.1]),
        'scores': np.random.uniform(0, 1, total_samples)
    }
}
 
# Evaluate all detectors
print("Anomaly Detection Performance Comparison:")
print("-" * 70)
print(f"{'Detector':<15} {'Precision':<10} {'Recall':<10} {'F1-Score':<10} {'AUC-ROC':<10}")
print("-" * 70)
 
for name, detector in detectors.items():
    results = evaluate_anomaly_detection(
        y_true, 
        detector['predictions'], 
        detector['scores']
    )
    
    print(f"{name:<15} {results['Precision']:<10.3f} {results['Recall']:<10.3f} "
          f"{results['F1-Score']:<10.3f} {results['AUC-ROC']:<10.3f}")
 
# Plot ROC and PR curves
from sklearn.metrics import roc_curve, precision_recall_curve
 
plt.figure(figsize=(12, 5))
 
plt.subplot(1, 2, 1)
for name, detector in detectors.items():
    fpr, tpr, _ = roc_curve(y_true, detector['scores'])
    auc = roc_auc_score(y_true, detector['scores'])
    plt.plot(fpr, tpr, label=f'{name} (AUC = {auc:.3f})')
 
plt.plot([0, 1], [0, 1], 'k--', alpha=0.5)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves')
plt.legend()
 
plt.subplot(1, 2, 2)
for name, detector in detectors.items():
    precision, recall, _ = precision_recall_curve(y_true, detector['scores'])
    auc_pr = average_precision_score(y_true, detector['scores'])
    plt.plot(recall, precision, label=f'{name} (AUC = {auc_pr:.3f})')
 
plt.axhline(y=n_anomalies/total_samples, color='k', linestyle='--', alpha=0.5, 
           label='Random Classifier')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curves')
plt.legend()
 
plt.tight_layout()
plt.show()

Practical Implementation

Real-time Anomaly Detection System

import threading
import time
from queue import Queue
from collections import deque
 
class RealTimeAnomalyDetector:
    def __init__(self, window_size=100, update_frequency=10):
        self.window_size = window_size
        self.update_frequency = update_frequency
        self.data_buffer = deque(maxlen=window_size)
        self.detector = ZScoreAnomalyDetector(threshold=3)
        self.is_trained = False
        self.anomaly_queue = Queue()
        self.running = False
    
    def add_data_point(self, value, timestamp=None):
        """Add new data point to buffer"""
        if timestamp is None:
            timestamp = time.time()
        
        self.data_buffer.append((timestamp, value))
        
        # Retrain detector periodically
        if len(self.data_buffer) >= self.update_frequency:
            self._update_detector()
    
    def _update_detector(self):
        """Update detector with recent data"""
        if len(self.data_buffer) < self.update_frequency:
            return
        
        # Extract values from buffer
        values = np.array([x[1] for x in list(self.data_buffer)])
        
        # Filter out previous anomalies for training (optional)
        if self.is_trained:
            predictions = self.detector.predict(values)
            normal_values = values[predictions == 0]
            if len(normal_values) > 0:
                values = normal_values
        
        # Retrain detector
        self.detector.fit(values)
        self.is_trained = True
    
    def detect_anomaly(self, value):
        """Check if new value is anomaly"""
        if not self.is_trained:
            return False, 0.0
        
        score = self.detector.decision_function(np.array([value]))[0]
        is_anomaly = score > self.detector.threshold
        
        return is_anomaly, score
    
    def process_stream(self, data_stream):
        """Process continuous data stream"""
        anomalies_detected = []
        
        for timestamp, value in data_stream:
            # Add to buffer and potentially retrain
            self.add_data_point(value, timestamp)
            
            # Check for anomaly
            is_anomaly, score = self.detect_anomaly(value)
            
            if is_anomaly:
                anomaly_info = {
                    'timestamp': timestamp,
                    'value': value,
                    'score': score
                }
                anomalies_detected.append(anomaly_info)
                print(f"ANOMALY DETECTED: {anomaly_info}")
        
        return anomalies_detected
 
# Simulate real-time data stream
def generate_data_stream(duration=100, anomaly_probability=0.02):
    """Generate simulated data stream with timestamps"""
    stream = []
    base_time = time.time()
    
    for i in range(duration):
        timestamp = base_time + i
        
        # Generate normal or anomalous value
        if np.random.random() < anomaly_probability:
            # Anomaly
            value = np.random.normal(0, 1) + np.random.choice([-5, 5])
        else:
            # Normal
            value = np.random.normal(0, 1)
        
        stream.append((timestamp, value))
    
    return stream
 
# Example usage
detector = RealTimeAnomalyDetector(window_size=50, update_frequency=10)
data_stream = generate_data_stream(duration=200, anomaly_probability=0.05)
 
print("Processing real-time data stream...")
anomalies = detector.process_stream(data_stream)
 
print(f"\nSummary: Detected {len(anomalies)} anomalies out of {len(data_stream)} data points")
 
# Visualize results
timestamps = [x[0] for x in data_stream]
values = [x[1] for x in data_stream]
anomaly_times = [x['timestamp'] for x in anomalies]
anomaly_values = [x['value'] for x in anomalies]
 
plt.figure(figsize=(12, 6))
plt.plot(timestamps, values, alpha=0.7, label='Data Stream')
plt.scatter(anomaly_times, anomaly_values, c='red', s=50, label='Detected Anomalies')
plt.xlabel('Timestamp')
plt.ylabel('Value')
plt.title('Real-time Anomaly Detection')
plt.legend()
plt.show()

Anomaly detection is a critical capability in many domains. The choice of method depends on your specific requirements: statistical methods for simple, interpretable detection; machine learning approaches for complex patterns; and deep learning methods for high-dimensional or sequential data. The key is understanding your data characteristics and performance requirements to select the most appropriate technique.