Anomaly Detection
Anomaly detection identifies unusual patterns, outliers, or anomalies in data that deviate significantly from expected behavior. It's crucial for fraud detection, network security, quality control, and monitoring systems.
Types of Anomalies
Point Anomalies
Individual data instances that are anomalous with respect to the rest of the data.
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs
# Generate data with point anomalies
np.random.seed(42)
X_normal, _ = make_blobs(n_samples=300, centers=1, cluster_std=1.0)
X_anomalies = np.random.uniform(low=-6, high=6, size=(20, 2))
X = np.vstack([X_normal, X_anomalies])
plt.figure(figsize=(10, 6))
plt.scatter(X_normal[:, 0], X_normal[:, 1], alpha=0.6, label='Normal')
plt.scatter(X_anomalies[:, 0], X_anomalies[:, 1], c='red', alpha=0.8, label='Anomalies')
plt.title('Point Anomalies Example')
plt.legend()
plt.show()
Contextual Anomalies
Data points that are anomalous in a specific context but normal otherwise.
# Time series with contextual anomalies
import pandas as pd
from datetime import datetime, timedelta
# Generate seasonal time series
dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
seasonal_pattern = 10 + 5 * np.sin(2 * np.pi * np.arange(len(dates)) / 365.25)
noise = np.random.normal(0, 0.5, len(dates))
ts_normal = seasonal_pattern + noise
# Add contextual anomalies (high values in winter, low values in summer)
ts_anomalous = ts_normal.copy()
ts_anomalous[60] = 20 # Anomalously high in winter
ts_anomalous[180] = 2 # Anomalously low in summer
plt.figure(figsize=(12, 6))
plt.plot(dates, ts_normal, alpha=0.7, label='Normal Pattern')
plt.plot(dates, ts_anomalous, 'r-', alpha=0.8, label='With Contextual Anomalies')
plt.scatter(dates[[60, 180]], ts_anomalous[[60, 180]],
c='red', s=100, zorder=5, label='Anomalies')
plt.title('Contextual Anomalies in Time Series')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.xticks(rotation=45)
plt.show()
Collective Anomalies
Collections of data points that are anomalous when considered together.
# Network traffic with collective anomaly (DDoS attack pattern)
np.random.seed(42)
normal_traffic = np.random.poisson(lam=50, size=200)
# Sudden spike representing collective anomaly
attack_period = np.random.poisson(lam=500, size=20)
traffic = np.concatenate([
normal_traffic[:100],
attack_period,
normal_traffic[100:]
])
plt.figure(figsize=(12, 6))
plt.plot(traffic, alpha=0.8)
plt.axvspan(100, 120, alpha=0.3, color='red', label='Collective Anomaly')
plt.title('Collective Anomaly in Network Traffic')
plt.xlabel('Time')
plt.ylabel('Requests per minute')
plt.legend()
plt.show()
Statistical Methods
Z-Score Based Detection
from scipy import stats
class ZScoreAnomalyDetector:
def __init__(self, threshold=3):
self.threshold = threshold
self.mean_ = None
self.std_ = None
def fit(self, X):
self.mean_ = np.mean(X)
self.std_ = np.std(X)
return self
def predict(self, X):
z_scores = np.abs((X - self.mean_) / self.std_)
return (z_scores > self.threshold).astype(int)
def decision_function(self, X):
return np.abs((X - self.mean_) / self.std_)
# Example with synthetic data
np.random.seed(42)
data = np.random.normal(0, 1, 1000)
# Add some anomalies
data[50] = 5
data[150] = -4.5
data[300] = 6
detector = ZScoreAnomalyDetector(threshold=3)
detector.fit(data)
predictions = detector.predict(data)
plt.figure(figsize=(12, 6))
plt.plot(data, alpha=0.7, label='Data')
plt.scatter(np.where(predictions == 1)[0], data[predictions == 1],
c='red', s=50, label='Anomalies')
plt.axhline(y=detector.mean_ + 3*detector.std_, color='r', linestyle='--', alpha=0.5)
plt.axhline(y=detector.mean_ - 3*detector.std_, color='r', linestyle='--', alpha=0.5)
plt.title('Z-Score Based Anomaly Detection')
plt.legend()
plt.show()
print(f"Detected {np.sum(predictions)} anomalies")
Modified Z-Score (Robust)
def modified_z_score(data):
"""
Calculate modified Z-score using median and MAD (Median Absolute Deviation)
More robust to outliers than standard Z-score
"""
median = np.median(data)
mad = np.median(np.abs(data - median))
modified_z_scores = 0.6745 * (data - median) / mad
return modified_z_scores
# Compare standard vs modified Z-score
data_with_outliers = np.concatenate([
np.random.normal(0, 1, 950),
np.random.normal(10, 0.5, 50) # Outlier cluster
])
z_scores = np.abs((data_with_outliers - np.mean(data_with_outliers)) / np.std(data_with_outliers))
modified_z_scores = np.abs(modified_z_score(data_with_outliers))
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.hist(data_with_outliers, bins=50, alpha=0.7)
plt.title('Data Distribution')
plt.subplot(1, 3, 2)
plt.scatter(range(len(data_with_outliers)), z_scores, alpha=0.6)
plt.axhline(y=3, color='r', linestyle='--', label='Threshold')
plt.title('Standard Z-Score')
plt.ylabel('Z-Score')
plt.legend()
plt.subplot(1, 3, 3)
plt.scatter(range(len(data_with_outliers)), modified_z_scores, alpha=0.6)
plt.axhline(y=3.5, color='r', linestyle='--', label='Threshold')
plt.title('Modified Z-Score')
plt.ylabel('Modified Z-Score')
plt.legend()
plt.tight_layout()
plt.show()
# Count anomalies detected by each method
standard_anomalies = np.sum(z_scores > 3)
modified_anomalies = np.sum(modified_z_scores > 3.5)
print(f"Standard Z-Score anomalies: {standard_anomalies}")
print(f"Modified Z-Score anomalies: {modified_anomalies}")
Machine Learning Approaches
Isolation Forest
from sklearn.ensemble import IsolationForest
from sklearn.datasets import make_classification
# Generate dataset with anomalies
X, _ = make_classification(
n_samples=1000, n_features=2, n_informative=2,
n_redundant=0, n_clusters_per_class=1, random_state=42
)
# Add anomalies
anomalies = np.random.uniform(low=-8, high=8, size=(50, 2))
X_with_anomalies = np.vstack([X, anomalies])
y_true = np.concatenate([np.zeros(len(X)), np.ones(len(anomalies))])
# Isolation Forest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
y_pred = iso_forest.fit_predict(X_with_anomalies)
y_pred = np.where(y_pred == -1, 1, 0) # Convert to 0/1
# Visualization
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.scatter(X[:, 0], X[:, 1], alpha=0.6, label='Normal')
plt.scatter(anomalies[:, 0], anomalies[:, 1], c='red', alpha=0.8, label='True Anomalies')
plt.title('Ground Truth')
plt.legend()
plt.subplot(1, 3, 2)
normal_pred = X_with_anomalies[y_pred == 0]
anomaly_pred = X_with_anomalies[y_pred == 1]
plt.scatter(normal_pred[:, 0], normal_pred[:, 1], alpha=0.6, label='Predicted Normal')
plt.scatter(anomaly_pred[:, 0], anomaly_pred[:, 1], c='red', alpha=0.8, label='Predicted Anomalies')
plt.title('Isolation Forest Predictions')
plt.legend()
plt.subplot(1, 3, 3)
# Decision boundary
xx, yy = np.meshgrid(np.linspace(-8, 8, 100), np.linspace(-8, 8, 100))
Z = iso_forest.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
plt.scatter(X_with_anomalies[:, 0], X_with_anomalies[:, 1], c=y_true, cmap='viridis', alpha=0.6)
plt.title('Decision Boundary')
plt.tight_layout()
plt.show()
# Evaluation
from sklearn.metrics import classification_report, confusion_matrix
print("Classification Report:")
print(classification_report(y_true, y_pred, target_names=['Normal', 'Anomaly']))
print("\nConfusion Matrix:")
print(confusion_matrix(y_true, y_pred))
One-Class SVM
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
# Prepare data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_with_anomalies)
# One-Class SVM
oc_svm = OneClassSVM(kernel='rbf', gamma='scale', nu=0.1)
y_pred_svm = oc_svm.fit_predict(X_scaled)
y_pred_svm = np.where(y_pred_svm == -1, 1, 0)
# Local Outlier Factor
from sklearn.neighbors import LocalOutlierFactor
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
y_pred_lof = lof.fit_predict(X_scaled)
y_pred_lof = np.where(y_pred_lof == -1, 1, 0)
# Compare methods
methods = {
'Isolation Forest': y_pred,
'One-Class SVM': y_pred_svm,
'LOF': y_pred_lof
}
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
for i, (method, predictions) in enumerate(methods.items()):
normal_pred = X_with_anomalies[predictions == 0]
anomaly_pred = X_with_anomalies[predictions == 1]
axes[i].scatter(normal_pred[:, 0], normal_pred[:, 1], alpha=0.6, label='Normal')
axes[i].scatter(anomaly_pred[:, 0], anomaly_pred[:, 1], c='red', alpha=0.8, label='Anomalies')
axes[i].set_title(f'{method}')
axes[i].legend()
plt.tight_layout()
plt.show()
# Performance comparison
for method, predictions in methods.items():
precision = np.sum((predictions == 1) & (y_true == 1)) / np.sum(predictions == 1)
recall = np.sum((predictions == 1) & (y_true == 1)) / np.sum(y_true == 1)
f1 = 2 * precision * recall / (precision + recall)
print(f"{method:15} - Precision: {precision:.3f}, Recall: {recall:.3f}, F1: {f1:.3f}")
Autoencoders for Anomaly Detection
import tensorflow as tf
from tensorflow.keras import layers, Model
class AnomalyAutoencoder(Model):
def __init__(self, input_dim, encoding_dim=32):
super(AnomalyAutoencoder, self).__init__()
self.input_dim = input_dim
self.encoding_dim = encoding_dim
# Encoder
self.encoder = tf.keras.Sequential([
layers.Dense(64, activation='relu'),
layers.Dense(32, activation='relu'),
layers.Dense(encoding_dim, activation='relu')
])
# Decoder
self.decoder = tf.keras.Sequential([
layers.Dense(32, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(input_dim, activation='linear')
])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
# Generate normal training data (without anomalies)
from sklearn.datasets import make_blobs
X_train, _ = make_blobs(n_samples=1000, centers=3, cluster_std=1.5, random_state=42)
# Normalize data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
# Create and train autoencoder
autoencoder = AnomalyAutoencoder(input_dim=X_train_scaled.shape[1], encoding_dim=8)
autoencoder.compile(optimizer='adam', loss='mse')
history = autoencoder.fit(
X_train_scaled, X_train_scaled,
epochs=100,
batch_size=32,
validation_split=0.2,
verbose=0
)
# Test on data with anomalies
X_test = np.vstack([X_train, anomalies])
X_test_scaled = scaler.transform(X_test)
# Calculate reconstruction error
X_pred = autoencoder.predict(X_test_scaled)
reconstruction_error = tf.keras.losses.mse(X_test_scaled, X_pred)
# Set threshold based on training data reconstruction error
train_pred = autoencoder.predict(X_train_scaled)
train_error = tf.keras.losses.mse(X_train_scaled, train_pred)
threshold = np.percentile(train_error, 95) # 95th percentile
# Predict anomalies
y_pred_ae = (reconstruction_error > threshold).numpy().astype(int)
# Visualization
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.subplot(1, 3, 2)
plt.scatter(range(len(reconstruction_error)), reconstruction_error, alpha=0.6)
plt.axhline(y=threshold, color='r', linestyle='--', label=f'Threshold ({threshold:.3f})')
plt.title('Reconstruction Error')
plt.xlabel('Sample Index')
plt.ylabel('MSE')
plt.legend()
plt.subplot(1, 3, 3)
normal_ae = X_test[y_pred_ae == 0]
anomaly_ae = X_test[y_pred_ae == 1]
plt.scatter(normal_ae[:, 0], normal_ae[:, 1], alpha=0.6, label='Normal')
plt.scatter(anomaly_ae[:, 0], anomaly_ae[:, 1], c='red', alpha=0.8, label='Anomalies')
plt.title('Autoencoder Anomaly Detection')
plt.legend()
plt.tight_layout()
plt.show()
print(f"Autoencoder detected {np.sum(y_pred_ae)} anomalies")
Time Series Anomaly Detection
STL Decomposition + Statistical Detection
from statsmodels.tsa.seasonal import STL
import pandas as pd
def detect_time_series_anomalies(ts, seasonal_periods=12, threshold=3):
"""
Detect anomalies in time series using STL decomposition
"""
# STL decomposition
stl = STL(ts, seasonal=seasonal_periods, robust=True)
decomposition = stl.fit()
# Use residuals for anomaly detection
residuals = decomposition.resid
# Modified Z-score on residuals
median_resid = np.median(residuals)
mad_resid = np.median(np.abs(residuals - median_resid))
modified_z_scores = 0.6745 * (residuals - median_resid) / mad_resid
anomalies = np.abs(modified_z_scores) > threshold
return anomalies, decomposition
# Generate time series with seasonal pattern and anomalies
np.random.seed(42)
dates = pd.date_range('2020-01-01', periods=365*2, freq='D')
seasonal = 10 * np.sin(2 * np.pi * np.arange(len(dates)) / 365)
trend = 0.01 * np.arange(len(dates))
noise = np.random.normal(0, 1, len(dates))
ts = seasonal + trend + noise
# Add anomalies
ts[100] += 15 # Spike
ts[300:310] -= 10 # Drop period
ts[500] += 20 # Another spike
ts = pd.Series(ts, index=dates)
# Detect anomalies
anomalies, decomposition = detect_time_series_anomalies(ts, seasonal_periods=365)
# Plot results
fig, axes = plt.subplots(4, 1, figsize=(15, 12))
# Original time series with anomalies
axes[0].plot(ts.index, ts.values, alpha=0.8, label='Time Series')
axes[0].scatter(ts.index[anomalies], ts.values[anomalies],
c='red', s=50, label='Anomalies', zorder=5)
axes[0].set_title('Original Time Series')
axes[0].legend()
# Trend
axes[1].plot(ts.index, decomposition.trend, color='orange', label='Trend')
axes[1].set_title('Trend Component')
# Seasonal
axes[2].plot(ts.index, decomposition.seasonal, color='green', label='Seasonal')
axes[2].set_title('Seasonal Component')
# Residuals
axes[3].plot(ts.index, decomposition.resid, alpha=0.8, label='Residuals')
axes[3].scatter(ts.index[anomalies], decomposition.resid[anomalies],
c='red', s=50, label='Anomalies', zorder=5)
axes[3].set_title('Residuals')
axes[3].legend()
plt.tight_layout()
plt.show()
print(f"Detected {np.sum(anomalies)} anomalies")
LSTM Autoencoder for Time Series
def create_sequences(data, seq_length):
"""Create sequences for LSTM input"""
sequences = []
for i in range(len(data) - seq_length):
sequences.append(data[i:i+seq_length])
return np.array(sequences)
class LSTMAnomalyDetector:
def __init__(self, sequence_length=30, encoding_dim=50):
self.sequence_length = sequence_length
self.encoding_dim = encoding_dim
self.model = None
self.scaler = StandardScaler()
def build_model(self, input_shape):
# Encoder
encoder_input = layers.Input(shape=input_shape)
encoder = layers.LSTM(self.encoding_dim, return_sequences=False)(encoder_input)
encoder = layers.RepeatVector(input_shape[0])(encoder)
# Decoder
decoder = layers.LSTM(self.encoding_dim, return_sequences=True)(encoder)
decoder_output = layers.TimeDistributed(layers.Dense(input_shape[1]))(decoder)
self.model = Model(encoder_input, decoder_output)
self.model.compile(optimizer='adam', loss='mse')
def fit(self, X, epochs=50, batch_size=32):
# Normalize data
X_scaled = self.scaler.fit_transform(X.reshape(-1, 1)).reshape(X.shape)
# Create sequences
sequences = create_sequences(X_scaled, self.sequence_length)
# Build model if not exists
if self.model is None:
self.build_model((self.sequence_length, 1))
# Train
history = self.model.fit(
sequences, sequences,
epochs=epochs,
batch_size=batch_size,
validation_split=0.2,
verbose=0
)
return history
def predict_anomalies(self, X, threshold_percentile=95):
# Normalize
X_scaled = self.scaler.transform(X.reshape(-1, 1)).reshape(X.shape)
# Create sequences
sequences = create_sequences(X_scaled, self.sequence_length)
# Predict
predictions = self.model.predict(sequences)
# Calculate reconstruction error
mse = np.mean(np.power(sequences - predictions, 2), axis=(1, 2))
# Set threshold
threshold = np.percentile(mse, threshold_percentile)
# Detect anomalies
anomalies = mse > threshold
# Extend to original length (pad beginning with False)
anomaly_flags = np.zeros(len(X), dtype=bool)
anomaly_flags[self.sequence_length:] = anomalies
return anomaly_flags, mse, threshold
# Generate synthetic time series
np.random.seed(42)
t = np.linspace(0, 100, 2000)
ts_normal = np.sin(0.1 * t) + 0.5 * np.sin(0.3 * t) + np.random.normal(0, 0.1, len(t))
# Add anomalies
ts_with_anomalies = ts_normal.copy()
ts_with_anomalies[500:520] += 2 # Anomaly period
ts_with_anomalies[1200] -= 3 # Point anomaly
ts_with_anomalies[1600:1650] += 1.5 # Another anomaly period
# Train LSTM detector
detector = LSTMAnomalyDetector(sequence_length=50, encoding_dim=32)
history = detector.fit(ts_normal, epochs=50) # Train only on normal data
# Detect anomalies in test data
anomalies, reconstruction_errors, threshold = detector.predict_anomalies(ts_with_anomalies)
# Visualization
plt.figure(figsize=(15, 8))
plt.subplot(2, 1, 1)
plt.plot(t, ts_with_anomalies, alpha=0.8, label='Time Series')
plt.scatter(t[anomalies], ts_with_anomalies[anomalies],
c='red', s=30, label='Detected Anomalies', zorder=5)
plt.title('LSTM Autoencoder Anomaly Detection')
plt.legend()
plt.subplot(2, 1, 2)
plt.plot(t[detector.sequence_length:], reconstruction_errors, alpha=0.8, label='Reconstruction Error')
plt.axhline(y=threshold, color='r', linestyle='--', label=f'Threshold')
plt.title('Reconstruction Error')
plt.xlabel('Time')
plt.ylabel('MSE')
plt.legend()
plt.tight_layout()
plt.show()
print(f"LSTM Autoencoder detected {np.sum(anomalies)} anomalies")
Evaluation Metrics
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, average_precision_score
def evaluate_anomaly_detection(y_true, y_pred, y_scores=None):
"""
Comprehensive evaluation of anomaly detection performance
"""
# Basic metrics
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
results = {
'Precision': precision,
'Recall': recall,
'F1-Score': f1,
'Accuracy': (y_true == y_pred).mean()
}
# If probability scores available
if y_scores is not None:
auc_roc = roc_auc_score(y_true, y_scores)
auc_pr = average_precision_score(y_true, y_scores)
results['AUC-ROC'] = auc_roc
results['AUC-PR'] = auc_pr
return results
# Create synthetic evaluation scenario
np.random.seed(42)
n_normal = 950
n_anomalies = 50
total_samples = n_normal + n_anomalies
# Generate ground truth
y_true = np.concatenate([np.zeros(n_normal), np.ones(n_anomalies)])
# Simulate different detector performances
detectors = {
'Perfect Detector': {
'predictions': y_true,
'scores': np.concatenate([np.random.beta(2, 5, n_normal),
np.random.beta(5, 2, n_anomalies)])
},
'Good Detector': {
'predictions': np.concatenate([
np.random.choice([0, 1], n_normal, p=[0.95, 0.05]),
np.random.choice([0, 1], n_anomalies, p=[0.15, 0.85])
]),
'scores': np.concatenate([np.random.beta(2, 8, n_normal),
np.random.beta(8, 2, n_anomalies)])
},
'Poor Detector': {
'predictions': np.random.choice([0, 1], total_samples, p=[0.9, 0.1]),
'scores': np.random.uniform(0, 1, total_samples)
}
}
# Evaluate all detectors
print("Anomaly Detection Performance Comparison:")
print("-" * 70)
print(f"{'Detector':<15} {'Precision':<10} {'Recall':<10} {'F1-Score':<10} {'AUC-ROC':<10}")
print("-" * 70)
for name, detector in detectors.items():
results = evaluate_anomaly_detection(
y_true,
detector['predictions'],
detector['scores']
)
print(f"{name:<15} {results['Precision']:<10.3f} {results['Recall']:<10.3f} "
f"{results['F1-Score']:<10.3f} {results['AUC-ROC']:<10.3f}")
# Plot ROC and PR curves
from sklearn.metrics import roc_curve, precision_recall_curve
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
for name, detector in detectors.items():
fpr, tpr, _ = roc_curve(y_true, detector['scores'])
auc = roc_auc_score(y_true, detector['scores'])
plt.plot(fpr, tpr, label=f'{name} (AUC = {auc:.3f})')
plt.plot([0, 1], [0, 1], 'k--', alpha=0.5)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves')
plt.legend()
plt.subplot(1, 2, 2)
for name, detector in detectors.items():
precision, recall, _ = precision_recall_curve(y_true, detector['scores'])
auc_pr = average_precision_score(y_true, detector['scores'])
plt.plot(recall, precision, label=f'{name} (AUC = {auc_pr:.3f})')
plt.axhline(y=n_anomalies/total_samples, color='k', linestyle='--', alpha=0.5,
label='Random Classifier')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curves')
plt.legend()
plt.tight_layout()
plt.show()
Practical Implementation
Real-time Anomaly Detection System
import threading
import time
from queue import Queue
from collections import deque
class RealTimeAnomalyDetector:
def __init__(self, window_size=100, update_frequency=10):
self.window_size = window_size
self.update_frequency = update_frequency
self.data_buffer = deque(maxlen=window_size)
self.detector = ZScoreAnomalyDetector(threshold=3)
self.is_trained = False
self.anomaly_queue = Queue()
self.running = False
def add_data_point(self, value, timestamp=None):
"""Add new data point to buffer"""
if timestamp is None:
timestamp = time.time()
self.data_buffer.append((timestamp, value))
# Retrain detector periodically
if len(self.data_buffer) >= self.update_frequency:
self._update_detector()
def _update_detector(self):
"""Update detector with recent data"""
if len(self.data_buffer) < self.update_frequency:
return
# Extract values from buffer
values = np.array([x[1] for x in list(self.data_buffer)])
# Filter out previous anomalies for training (optional)
if self.is_trained:
predictions = self.detector.predict(values)
normal_values = values[predictions == 0]
if len(normal_values) > 0:
values = normal_values
# Retrain detector
self.detector.fit(values)
self.is_trained = True
def detect_anomaly(self, value):
"""Check if new value is anomaly"""
if not self.is_trained:
return False, 0.0
score = self.detector.decision_function(np.array([value]))[0]
is_anomaly = score > self.detector.threshold
return is_anomaly, score
def process_stream(self, data_stream):
"""Process continuous data stream"""
anomalies_detected = []
for timestamp, value in data_stream:
# Add to buffer and potentially retrain
self.add_data_point(value, timestamp)
# Check for anomaly
is_anomaly, score = self.detect_anomaly(value)
if is_anomaly:
anomaly_info = {
'timestamp': timestamp,
'value': value,
'score': score
}
anomalies_detected.append(anomaly_info)
print(f"ANOMALY DETECTED: {anomaly_info}")
return anomalies_detected
# Simulate real-time data stream
def generate_data_stream(duration=100, anomaly_probability=0.02):
"""Generate simulated data stream with timestamps"""
stream = []
base_time = time.time()
for i in range(duration):
timestamp = base_time + i
# Generate normal or anomalous value
if np.random.random() < anomaly_probability:
# Anomaly
value = np.random.normal(0, 1) + np.random.choice([-5, 5])
else:
# Normal
value = np.random.normal(0, 1)
stream.append((timestamp, value))
return stream
# Example usage
detector = RealTimeAnomalyDetector(window_size=50, update_frequency=10)
data_stream = generate_data_stream(duration=200, anomaly_probability=0.05)
print("Processing real-time data stream...")
anomalies = detector.process_stream(data_stream)
print(f"\nSummary: Detected {len(anomalies)} anomalies out of {len(data_stream)} data points")
# Visualize results
timestamps = [x[0] for x in data_stream]
values = [x[1] for x in data_stream]
anomaly_times = [x['timestamp'] for x in anomalies]
anomaly_values = [x['value'] for x in anomalies]
plt.figure(figsize=(12, 6))
plt.plot(timestamps, values, alpha=0.7, label='Data Stream')
plt.scatter(anomaly_times, anomaly_values, c='red', s=50, label='Detected Anomalies')
plt.xlabel('Timestamp')
plt.ylabel('Value')
plt.title('Real-time Anomaly Detection')
plt.legend()
plt.show()
Anomaly detection is a critical capability in many domains. The choice of method depends on your specific requirements: statistical methods for simple, interpretable detection; machine learning approaches for complex patterns; and deep learning methods for high-dimensional or sequential data. The key is understanding your data characteristics and performance requirements to select the most appropriate technique.