Applications
Unsupervised learning techniques have diverse real-world applications across industries, from data exploration and preprocessing to advanced analytics and decision-making systems. This section explores practical implementations and use cases.
Customer Segmentation
Customer segmentation uses clustering to group customers based on behavior, demographics, and preferences for targeted marketing strategies.
RFM Analysis with Clustering
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
# Generate synthetic customer transaction data
np.random.seed(42)
n_customers = 1000
# Generate customer data
customer_data = []
for customer_id in range(n_customers):
# Random customer behavior patterns
base_frequency = np.random.exponential(30) # Average days between purchases
base_monetary = np.random.lognormal(4, 1) # Average spending
n_transactions = np.random.poisson(5) + 1 # Number of transactions
for _ in range(n_transactions):
days_ago = np.random.exponential(base_frequency)
amount = np.random.lognormal(np.log(base_monetary), 0.5)
customer_data.append({
'customer_id': customer_id,
'transaction_date': datetime.now() - timedelta(days=days_ago),
'amount': max(amount, 5) # Minimum transaction amount
})
df_transactions = pd.DataFrame(customer_data)
def calculate_rfm_metrics(df):
"""Calculate RFM (Recency, Frequency, Monetary) metrics"""
current_date = df['transaction_date'].max()
rfm = df.groupby('customer_id').agg({
'transaction_date': lambda x: (current_date - x.max()).days, # Recency
'amount': ['count', 'sum'] # Frequency and Monetary
}).round(2)
# Flatten column names
rfm.columns = ['recency', 'frequency', 'monetary']
rfm = rfm.reset_index()
return rfm
# Calculate RFM metrics
rfm_data = calculate_rfm_metrics(df_transactions)
# RFM Scoring (1-5 scale)
def create_rfm_scores(rfm_df):
"""Create RFM scores using quintiles"""
rfm_scores = rfm_df.copy()
# Recency: Lower is better (more recent)
rfm_scores['R_score'] = pd.qcut(rfm_df['recency'], 5, labels=[5,4,3,2,1])
# Frequency: Higher is better
rfm_scores['F_score'] = pd.qcut(rfm_df['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
# Monetary: Higher is better
rfm_scores['M_score'] = pd.qcut(rfm_df['monetary'], 5, labels=[1,2,3,4,5])
# Combine into RFM score
rfm_scores['RFM_Score'] = (
rfm_scores['R_score'].astype(str) +
rfm_scores['F_score'].astype(str) +
rfm_scores['M_score'].astype(str)
)
return rfm_scores
rfm_scores = create_rfm_scores(rfm_data)
# K-means clustering on RFM metrics
scaler = StandardScaler()
rfm_scaled = scaler.fit_transform(rfm_data[['recency', 'frequency', 'monetary']])
# Determine optimal number of clusters
inertias = []
K_range = range(2, 11)
for k in K_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
kmeans.fit(rfm_scaled)
inertias.append(kmeans.inertia_)
# Plot elbow curve
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(K_range, inertias, 'bo-')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Inertia')
plt.title('Elbow Method for Optimal k')
plt.grid(True)
# Apply K-means with optimal clusters
optimal_k = 5
kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
rfm_data['cluster'] = kmeans.fit_predict(rfm_scaled)
# Visualize clusters using PCA
pca = PCA(n_components=2)
rfm_pca = pca.fit_transform(rfm_scaled)
plt.subplot(1, 2, 2)
scatter = plt.scatter(rfm_pca[:, 0], rfm_pca[:, 1], c=rfm_data['cluster'], cmap='viridis')
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%} variance)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%} variance)')
plt.title('Customer Segments (PCA visualization)')
plt.colorbar(scatter)
plt.tight_layout()
plt.show()
# Analyze cluster characteristics
cluster_summary = rfm_data.groupby('cluster').agg({
'recency': ['mean', 'median'],
'frequency': ['mean', 'median'],
'monetary': ['mean', 'median'],
'customer_id': 'count'
}).round(2)
cluster_summary.columns = ['_'.join(col).strip() for col in cluster_summary.columns]
print("Cluster Characteristics:")
print(cluster_summary)
# Customer segment interpretation
def interpret_clusters(cluster_data):
"""Interpret customer clusters based on RFM characteristics"""
interpretations = {}
for cluster in cluster_data['cluster'].unique():
cluster_info = cluster_data[cluster_data['cluster'] == cluster]
avg_recency = cluster_info['recency'].mean()
avg_frequency = cluster_info['frequency'].mean()
avg_monetary = cluster_info['monetary'].mean()
# Rule-based interpretation
if avg_recency <= 30 and avg_frequency >= 3 and avg_monetary >= 200:
segment = "Champions"
description = "Best customers: recent, frequent, high-value purchases"
elif avg_recency <= 60 and avg_frequency >= 2 and avg_monetary >= 100:
segment = "Loyal Customers"
description = "Regular customers with good value"
elif avg_recency <= 30 and avg_frequency < 2:
segment = "New Customers"
description = "Recent customers, need nurturing"
elif avg_recency > 90 and avg_frequency >= 2:
segment = "At Risk"
description = "Previously good customers, haven't purchased recently"
elif avg_recency > 180:
segment = "Lost Customers"
description = "Haven't purchased in a long time"
else:
segment = "Potential Loyalists"
description = "Recent customers with potential for growth"
interpretations[cluster] = {
'segment': segment,
'description': description,
'count': len(cluster_info),
'avg_recency': avg_recency,
'avg_frequency': avg_frequency,
'avg_monetary': avg_monetary
}
return interpretations
cluster_interpretations = interpret_clusters(rfm_data)
print("\nCustomer Segment Interpretations:")
for cluster, info in cluster_interpretations.items():
print(f"\nCluster {cluster}: {info['segment']}")
print(f" Description: {info['description']}")
print(f" Size: {info['count']} customers ({info['count']/len(rfm_data)*100:.1f}%)")
print(f" Avg Recency: {info['avg_recency']:.0f} days")
print(f" Avg Frequency: {info['avg_frequency']:.1f} purchases")
print(f" Avg Monetary: {info['avg_monetary']:.2f}")
Fraud Detection
Anomaly detection techniques identify potentially fraudulent transactions by detecting unusual patterns.
Multi-layered Fraud Detection System
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
from sklearn.svm import OneClassSVM
import warnings
warnings.filterwarnings('ignore')
class FraudDetectionSystem:
def __init__(self):
self.models = {
'isolation_forest': IsolationForest(contamination=0.1, random_state=42),
'local_outlier_factor': LocalOutlierFactor(contamination=0.1, novelty=True),
'elliptic_envelope': EllipticEnvelope(contamination=0.1, random_state=42),
'one_class_svm': OneClassSVM(gamma='scale', nu=0.1)
}
self.feature_importance = {}
self.thresholds = {}
def engineer_features(self, transactions_df):
"""Engineer features for fraud detection"""
features = transactions_df.copy()
# Time-based features
features['hour'] = pd.to_datetime(features['transaction_date']).dt.hour
features['day_of_week'] = pd.to_datetime(features['transaction_date']).dt.dayofweek
features['is_weekend'] = (features['day_of_week'] >= 5).astype(int)
features['is_night'] = ((features['hour'] < 6) | (features['hour'] > 22)).astype(int)
# Customer behavior features
customer_stats = transactions_df.groupby('customer_id').agg({
'amount': ['mean', 'std', 'max', 'count'],
'transaction_date': lambda x: (x.max() - x.min()).days
}).reset_index()
customer_stats.columns = ['customer_id', 'avg_amount', 'std_amount', 'max_amount',
'transaction_count', 'account_age_days']
customer_stats['std_amount'] = customer_stats['std_amount'].fillna(0)
# Merge customer statistics
features = features.merge(customer_stats, on='customer_id', how='left')
# Deviation from customer's normal behavior
features['amount_zscore'] = np.where(
features['std_amount'] > 0,
(features['amount'] - features['avg_amount']) / features['std_amount'],
0
)
# Transaction velocity (transactions in last hour - simulated)
np.random.seed(42)
features['recent_transaction_count'] = np.random.poisson(1, len(features))
# Merchant category risk (simulated)
risk_categories = ['online', 'atm', 'retail', 'restaurant', 'gas_station', 'other']
features['merchant_category'] = np.random.choice(risk_categories, len(features))
# Risk scores for categories
category_risk = {'online': 3, 'atm': 2, 'retail': 1, 'restaurant': 1, 'gas_station': 1, 'other': 2}
features['merchant_risk_score'] = features['merchant_category'].map(category_risk)
# Location risk (simulated - distance from usual location)
features['location_risk'] = np.random.exponential(1, len(features))
return features
def select_fraud_features(self, features_df):
"""Select relevant features for fraud detection"""
feature_cols = [
'amount', 'hour', 'day_of_week', 'is_weekend', 'is_night',
'amount_zscore', 'recent_transaction_count', 'merchant_risk_score',
'location_risk', 'transaction_count', 'account_age_days'
]
return features_df[feature_cols]
def train(self, transactions_df, fraud_labels=None):
"""Train the fraud detection models"""
# Engineer features
features_df = self.engineer_features(transactions_df)
X = self.select_fraud_features(features_df)
# Standardize features
self.scaler = StandardScaler()
X_scaled = self.scaler.fit_transform(X)
# Train each model
trained_models = {}
for name, model in self.models.items():
print(f"Training {name}...")
if name == 'local_outlier_factor':
# LOF requires fit and predict separately for novelty detection
model.fit(X_scaled)
else:
model.fit(X_scaled)
trained_models[name] = model
self.trained_models = trained_models
self.feature_names = X.columns.tolist()
return self
def predict_fraud(self, transactions_df, voting_threshold=0.5):
"""Predict fraud using ensemble of models"""
# Engineer features
features_df = self.engineer_features(transactions_df)
X = self.select_fraud_features(features_df)
X_scaled = self.scaler.transform(X)
# Get predictions from each model
predictions = {}
scores = {}
for name, model in self.trained_models.items():
if name == 'local_outlier_factor':
pred = model.predict(X_scaled)
score = -model.negative_outlier_factor_ # LOF gives negative scores
elif name == 'isolation_forest':
pred = model.predict(X_scaled)
score = -model.decision_function(X_scaled) # IF gives negative scores for outliers
else:
pred = model.predict(X_scaled)
score = -model.decision_function(X_scaled) if hasattr(model, 'decision_function') else pred
# Convert to 0/1 (1 = fraud)
predictions[name] = (pred == -1).astype(int)
scores[name] = score
# Ensemble voting
pred_df = pd.DataFrame(predictions)
ensemble_fraud_prob = pred_df.mean(axis=1)
ensemble_prediction = (ensemble_fraud_prob >= voting_threshold).astype(int)
# Combine scores
score_df = pd.DataFrame(scores)
ensemble_score = score_df.mean(axis=1)
results = pd.DataFrame({
'transaction_id': range(len(X)),
'fraud_probability': ensemble_fraud_prob,
'is_fraud_predicted': ensemble_prediction,
'fraud_score': ensemble_score
})
# Add individual model predictions
for name in predictions:
results[f'{name}_prediction'] = predictions[name]
results[f'{name}_score'] = scores[name]
return results
# Generate synthetic transaction data with fraud cases
def generate_transaction_data_with_fraud(n_transactions=5000, fraud_rate=0.02):
"""Generate synthetic transaction data with fraud cases"""
np.random.seed(42)
transactions = []
n_fraud = int(n_transactions * fraud_rate)
# Generate normal transactions
for i in range(n_transactions - n_fraud):
customer_id = np.random.randint(0, 1000)
amount = np.random.lognormal(4, 1) # Normal spending pattern
date = datetime.now() - timedelta(days=np.random.randint(0, 365))
transactions.append({
'transaction_id': i,
'customer_id': customer_id,
'amount': amount,
'transaction_date': date,
'is_fraud': 0
})
# Generate fraudulent transactions
for i in range(n_fraud):
customer_id = np.random.randint(0, 1000)
# Fraud characteristics: unusual amounts, timing, etc.
if np.random.random() < 0.5:
# Large amount fraud
amount = np.random.uniform(1000, 10000)
else:
# Unusual timing fraud
amount = np.random.lognormal(4, 1)
# Unusual timing
if np.random.random() < 0.3:
hour = np.random.choice([2, 3, 4, 5]) # Late night
date = datetime.now() - timedelta(days=np.random.randint(0, 30))
date = date.replace(hour=hour)
else:
date = datetime.now() - timedelta(days=np.random.randint(0, 365))
transactions.append({
'transaction_id': n_transactions - n_fraud + i,
'customer_id': customer_id,
'amount': amount,
'transaction_date': date,
'is_fraud': 1
})
return pd.DataFrame(transactions)
# Generate data and apply fraud detection
fraud_data = generate_transaction_data_with_fraud(n_transactions=5000, fraud_rate=0.02)
# Split into train/test
train_size = int(0.7 * len(fraud_data))
train_data = fraud_data.iloc[:train_size]
test_data = fraud_data.iloc[train_size:]
# Initialize and train fraud detection system
fraud_detector = FraudDetectionSystem()
fraud_detector.train(train_data)
# Predict on test data
predictions = fraud_detector.predict_fraud(test_data)
# Merge with ground truth
test_results = test_data.merge(predictions, left_index=True, right_on='transaction_id', how='left')
# Evaluate performance
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
y_true = test_results['is_fraud']
y_pred = test_results['is_fraud_predicted']
y_scores = test_results['fraud_probability']
print("Fraud Detection Performance:")
print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=['Normal', 'Fraud']))
print("\nConfusion Matrix:")
print(confusion_matrix(y_true, y_pred))
print(f"\nROC AUC Score: {roc_auc_score(y_true, y_scores):.3f}")
# Visualize results
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.hist(test_results[test_results['is_fraud']==0]['fraud_probability'],
bins=30, alpha=0.7, label='Normal', density=True)
plt.hist(test_results[test_results['is_fraud']==1]['fraud_probability'],
bins=30, alpha=0.7, label='Fraud', density=True)
plt.xlabel('Fraud Probability')
plt.ylabel('Density')
plt.title('Fraud Probability Distribution')
plt.legend()
plt.subplot(1, 3, 2)
fpr, tpr, _ = roc_curve(y_true, y_scores)
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {roc_auc_score(y_true, y_scores):.3f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.subplot(1, 3, 3)
# Feature importance approximation
feature_importance = []
for feature in fraud_detector.feature_names:
# Simple correlation with fraud labels
correlation = abs(test_data[feature].corr(test_data['is_fraud']))
feature_importance.append((feature, correlation))
feature_importance.sort(key=lambda x: x[1], reverse=True)
features, importance = zip(*feature_importance[:10])
plt.barh(range(len(features)), importance)
plt.yticks(range(len(features)), features)
plt.xlabel('Feature Importance (abs correlation)')
plt.title('Top 10 Features for Fraud Detection')
plt.tight_layout()
plt.show()
# Alert system for high-risk transactions
high_risk_transactions = test_results[test_results['fraud_probability'] > 0.7]
print(f"\nHigh Risk Transactions (probability > 0.7): {len(high_risk_transactions)}")
if len(high_risk_transactions) > 0:
print("\nTop 5 highest risk transactions:")
print(high_risk_transactions[['customer_id', 'amount', 'fraud_probability', 'is_fraud']]
.sort_values('fraud_probability', ascending=False).head())
Recommendation Systems
Collaborative Filtering with Matrix Factorization
from sklearn.metrics.pairwise import cosine_similarity
from scipy.sparse.linalg import svds
import scipy.sparse as sp
class CollaborativeFilteringRecommender:
def __init__(self, n_factors=50, regularization=0.01):
self.n_factors = n_factors
self.regularization = regularization
self.user_factors = None
self.item_factors = None
self.user_means = None
self.global_mean = None
def create_interaction_matrix(self, ratings_df):
"""Create user-item interaction matrix"""
# Create pivot table
interaction_matrix = ratings_df.pivot_table(
index='user_id',
columns='item_id',
values='rating',
fill_value=0
)
# Store mappings
self.user_to_idx = {user: idx for idx, user in enumerate(interaction_matrix.index)}
self.idx_to_user = {idx: user for user, idx in self.user_to_idx.items()}
self.item_to_idx = {item: idx for idx, item in enumerate(interaction_matrix.columns)}
self.idx_to_item = {idx: item for item, idx in self.item_to_idx.items()}
return interaction_matrix.values
def fit(self, ratings_df):
"""Train the collaborative filtering model using SVD"""
# Create interaction matrix
self.interaction_matrix = self.create_interaction_matrix(ratings_df)
# Calculate global statistics
non_zero_ratings = self.interaction_matrix[self.interaction_matrix > 0]
self.global_mean = non_zero_ratings.mean()
# Calculate user means
self.user_means = np.array([
row[row > 0].mean() if np.sum(row > 0) > 0 else self.global_mean
for row in self.interaction_matrix
])
# Center the ratings (subtract user mean)
centered_matrix = self.interaction_matrix.copy()
for i in range(len(centered_matrix)):
non_zero_mask = centered_matrix[i] > 0
centered_matrix[i][non_zero_mask] -= self.user_means[i]
# Apply SVD
U, sigma, Vt = svds(centered_matrix, k=self.n_factors)
# Store factors
self.user_factors = U
self.item_factors = Vt.T
self.sigma = np.diag(sigma)
return self
def predict_rating(self, user_id, item_id):
"""Predict rating for a user-item pair"""
if user_id not in self.user_to_idx or item_id not in self.item_to_idx:
return self.global_mean
user_idx = self.user_to_idx[user_id]
item_idx = self.item_to_idx[item_id]
# Reconstruct rating
user_vec = self.user_factors[user_idx]
item_vec = self.item_factors[item_idx]
predicted_rating = np.dot(user_vec, np.dot(self.sigma, item_vec)) + self.user_means[user_idx]
# Clip to valid rating range
return np.clip(predicted_rating, 1, 5)
def recommend_items(self, user_id, n_recommendations=10, exclude_rated=True):
"""Recommend items for a user"""
if user_id not in self.user_to_idx:
# Cold start - recommend popular items
return self.get_popular_items(n_recommendations)
user_idx = self.user_to_idx[user_id]
# Predict ratings for all items
predicted_ratings = {}
for item_id, item_idx in self.item_to_idx.items():
# Skip items user has already rated
if exclude_rated and self.interaction_matrix[user_idx][item_idx] > 0:
continue
predicted_rating = self.predict_rating(user_id, item_id)
predicted_ratings[item_id] = predicted_rating
# Sort by predicted rating
recommendations = sorted(predicted_ratings.items(),
key=lambda x: x[1], reverse=True)
return recommendations[:n_recommendations]
def get_popular_items(self, n_items=10):
"""Get most popular items (fallback for cold start)"""
item_popularity = np.sum(self.interaction_matrix > 0, axis=0)
popular_indices = np.argsort(item_popularity)[::-1][:n_items]
popular_items = [(self.idx_to_item[idx], item_popularity[idx])
for idx in popular_indices]
return popular_items
def find_similar_users(self, user_id, n_similar=10):
"""Find similar users using cosine similarity"""
if user_id not in self.user_to_idx:
return []
user_idx = self.user_to_idx[user_id]
user_vector = self.user_factors[user_idx].reshape(1, -1)
# Calculate similarities with all users
similarities = cosine_similarity(user_vector, self.user_factors)[0]
# Get top similar users (excluding self)
similar_indices = np.argsort(similarities)[::-1][1:n_similar+1]
similar_users = [(self.idx_to_user[idx], similarities[idx])
for idx in similar_indices]
return similar_users
# Generate synthetic rating data
def generate_rating_data(n_users=1000, n_items=500, n_ratings=20000):
"""Generate synthetic user-item ratings"""
np.random.seed(42)
# User preferences (latent factors)
user_preferences = np.random.normal(0, 1, (n_users, 5))
# Item characteristics (latent factors)
item_characteristics = np.random.normal(0, 1, (n_items, 5))
ratings = []
for _ in range(n_ratings):
user_id = np.random.randint(0, n_users)
item_id = np.random.randint(0, n_items)
# Generate rating based on user-item compatibility
compatibility = np.dot(user_preferences[user_id], item_characteristics[item_id])
base_rating = 3 + compatibility # Base rating around 3
# Add noise and clip to 1-5 range
rating = np.clip(base_rating + np.random.normal(0, 0.5), 1, 5)
ratings.append({
'user_id': user_id,
'item_id': item_id,
'rating': round(rating, 1)
})
return pd.DataFrame(ratings)
# Create and train recommender
rating_data = generate_rating_data()
print(f"Generated {len(rating_data)} ratings for {rating_data['user_id'].nunique()} users and {rating_data['item_id'].nunique()} items")
recommender = CollaborativeFilteringRecommender(n_factors=20)
recommender.fit(rating_data)
# Test recommendations
test_user = 42
recommendations = recommender.recommend_items(test_user, n_recommendations=10)
similar_users = recommender.find_similar_users(test_user, n_similar=5)
print(f"\nRecommendations for User {test_user}:")
for item_id, predicted_rating in recommendations:
print(f" Item {item_id}: Predicted rating {predicted_rating:.2f}")
print(f"\nUsers similar to User {test_user}:")
for similar_user, similarity in similar_users:
print(f" User {similar_user}: Similarity {similarity:.3f}")
# Evaluate recommender performance
def evaluate_recommender(recommender, test_ratings, k=10):
"""Evaluate recommender using precision@k and recall@k"""
precisions = []
recalls = []
for user_id in test_ratings['user_id'].unique()[:100]: # Sample for efficiency
# Get user's actual high ratings (>= 4)
user_test_items = test_ratings[
(test_ratings['user_id'] == user_id) &
(test_ratings['rating'] >= 4)
]['item_id'].tolist()
if len(user_test_items) == 0:
continue
# Get recommendations
recommendations = recommender.recommend_items(user_id, n_recommendations=k)
recommended_items = [item_id for item_id, _ in recommendations]
# Calculate precision and recall
relevant_recommended = set(recommended_items) & set(user_test_items)
precision = len(relevant_recommended) / len(recommended_items) if recommended_items else 0
recall = len(relevant_recommended) / len(user_test_items) if user_test_items else 0
precisions.append(precision)
recalls.append(recall)
return np.mean(precisions), np.mean(recalls)
# Split data for evaluation
train_size = int(0.8 * len(rating_data))
train_ratings = rating_data.iloc[:train_size]
test_ratings = rating_data.iloc[train_size:]
# Train on training data
eval_recommender = CollaborativeFilteringRecommender(n_factors=20)
eval_recommender.fit(train_ratings)
# Evaluate
precision, recall = evaluate_recommender(eval_recommender, test_ratings, k=10)
print(f"\nRecommender Performance:")
print(f"Precision@10: {precision:.3f}")
print(f"Recall@10: {recall:.3f}")
print(f"F1@10: {2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0:.3f}")
Data Preprocessing and Feature Engineering
Automated Feature Engineering Pipeline
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.decomposition import PCA, FastICA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
class AutoFeatureEngineering:
def __init__(self):
self.preprocessors = {}
self.feature_selectors = {}
self.dimensionality_reducers = {}
self.generated_features = []
def analyze_dataset(self, df, target_column=None):
"""Analyze dataset characteristics"""
analysis = {
'n_rows': len(df),
'n_columns': len(df.columns),
'column_types': df.dtypes.to_dict(),
'missing_values': df.isnull().sum().to_dict(),
'categorical_columns': df.select_dtypes(include=['object', 'category']).columns.tolist(),
'numerical_columns': df.select_dtypes(include=[np.number]).columns.tolist(),
'high_cardinality_cats': [],
'low_variance_features': [],
'highly_correlated_pairs': []
}
# Identify high cardinality categorical features
for col in analysis['categorical_columns']:
if df[col].nunique() > 50:
analysis['high_cardinality_cats'].append(col)
# Identify low variance numerical features
numerical_df = df[analysis['numerical_columns']]
for col in numerical_df.columns:
if numerical_df[col].var() < 0.01:
analysis['low_variance_features'].append(col)
# Find highly correlated feature pairs
if len(analysis['numerical_columns']) > 1:
corr_matrix = numerical_df.corr().abs()
upper_tri = corr_matrix.where(
np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
)
high_corr_pairs = [
(col, row) for col in upper_tri.columns
for row in upper_tri.index
if upper_tri.loc[row, col] > 0.95
]
analysis['highly_correlated_pairs'] = high_corr_pairs
return analysis
def preprocess_features(self, df, target_column=None):
"""Preprocess features based on their characteristics"""
processed_df = df.copy()
analysis = self.analyze_dataset(df, target_column)
# Handle missing values
for col, missing_count in analysis['missing_values'].items():
if missing_count > 0:
if col in analysis['numerical_columns']:
# Fill numerical missing values with median
processed_df[col].fillna(processed_df[col].median(), inplace=True)
else:
# Fill categorical missing values with mode
processed_df[col].fillna(processed_df[col].mode()[0], inplace=True)
# Encode categorical variables
encoded_features = []\n \n for col in analysis['categorical_columns']:\n if col == target_column:\n continue\n \n n_unique = processed_df[col].nunique()\n \n if n_unique <= 10: # One-hot encode low cardinality\n encoder = OneHotEncoder(sparse=False, drop='first')\n encoded = encoder.fit_transform(processed_df[[col]])\n \n feature_names = [f\"{col}_{cat}\" for cat in encoder.categories_[0][1:]]\n encoded_df = pd.DataFrame(encoded, columns=feature_names, index=processed_df.index)\n \n encoded_features.extend(feature_names)\n processed_df = pd.concat([processed_df, encoded_df], axis=1)\n self.preprocessors[col] = ('onehot', encoder)\n \n else: # Label encode high cardinality\n encoder = LabelEncoder()\n processed_df[f\"{col}_encoded\"] = encoder.fit_transform(processed_df[col].astype(str))\n encoded_features.append(f\"{col}_encoded\")\n self.preprocessors[col] = ('label', encoder)\n \n # Drop original categorical columns\n processed_df.drop(columns=analysis['categorical_columns'], inplace=True, errors='ignore')\n \n return processed_df, encoded_features
def generate_polynomial_features(self, df, numerical_columns, degree=2, interaction_only=True):\n \"\"\"Generate polynomial and interaction features\"\"\"\n from sklearn.preprocessing import PolynomialFeatures\n \n if len(numerical_columns) > 10: # Limit for computational efficiency\n numerical_columns = numerical_columns[:10]\n \n poly = PolynomialFeatures(\n degree=degree, \n interaction_only=interaction_only, \n include_bias=False\n )\n \n poly_features = poly.fit_transform(df[numerical_columns])\n feature_names = poly.get_feature_names_out(numerical_columns)\n \n poly_df = pd.DataFrame(poly_features, columns=feature_names, index=df.index)\n \n # Remove original features (keep only new polynomial/interaction terms)\n new_features = [name for name in feature_names if name not in numerical_columns]\n \n return poly_df[new_features], new_features
def apply_clustering_features(self, df, n_clusters=5):\n \"\"\"Generate cluster-based features\"\"\"\n numerical_columns = df.select_dtypes(include=[np.number]).columns.tolist()\n \n if len(numerical_columns) < 2:\n return pd.DataFrame(index=df.index), []\n \n # Apply different clustering algorithms\n scaler = StandardScaler()\n scaled_data = scaler.fit_transform(df[numerical_columns])\n \n cluster_features = pd.DataFrame(index=df.index)\n feature_names = []\n \n # K-means clustering\n kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)\n cluster_features['kmeans_cluster'] = kmeans.fit_predict(scaled_data)\n feature_names.append('kmeans_cluster')\n \n # Distance to cluster centers\n distances = kmeans.transform(scaled_data)\n for i in range(n_clusters):\n col_name = f'distance_to_cluster_{i}'\n cluster_features[col_name] = distances[:, i]\n feature_names.append(col_name)\n \n return cluster_features, feature_names
\n def select_best_features(self, X, y, k=50, method='mutual_info'):\n \"\"\"Select best features using various methods\"\"\"\n if method == 'mutual_info':\n selector = SelectKBest(score_func=mutual_info_classif, k=min(k, X.shape[1]))\n else:\n selector = SelectKBest(score_func=f_classif, k=min(k, X.shape[1]))\n \n X_selected = selector.fit_transform(X, y)\n selected_features = X.columns[selector.get_support()].tolist()\n \n self.feature_selectors[method] = selector\n \n return pd.DataFrame(X_selected, columns=selected_features, index=X.index), selected_features
\n def apply_dimensionality_reduction(self, X, n_components=10, methods=['pca', 'ica']):\n \"\"\"Apply multiple dimensionality reduction techniques\"\"\"\n reduced_features = pd.DataFrame(index=X.index)\n feature_names = []\n \n scaler = StandardScaler()\n X_scaled = scaler.fit_transform(X)\n \n for method in methods:\n if method == 'pca':\n reducer = PCA(n_components=min(n_components, X.shape[1]))\n reduced = reducer.fit_transform(X_scaled)\n cols = [f'pca_{i}' for i in range(reduced.shape[1])]\n \n elif method == 'ica':\n reducer = FastICA(n_components=min(n_components, X.shape[1]), random_state=42)\n reduced = reducer.fit_transform(X_scaled)\n cols = [f'ica_{i}' for i in range(reduced.shape[1])]\n \n elif method == 'lda' and hasattr(self, 'target_column'):\n # Only if we have target for supervised dimensionality reduction\n continue\n \n reduced_df = pd.DataFrame(reduced, columns=cols, index=X.index)\n reduced_features = pd.concat([reduced_features, reduced_df], axis=1)\n feature_names.extend(cols)\n \n self.dimensionality_reducers[method] = (scaler, reducer)\n \n return reduced_features, feature_names
\n def create_comprehensive_features(self, df, target_column=None, max_features=200):\n \"\"\"Create comprehensive feature set with all techniques\"\"\"\n print(\"Analyzing dataset...\")\n analysis = self.analyze_dataset(df, target_column)\n \n print(f\"Dataset: {analysis['n_rows']} rows, {analysis['n_columns']} columns\")\n print(f\"Categorical: {len(analysis['categorical_columns'])}, Numerical: {len(analysis['numerical_columns'])}\")\n \n # Preprocess basic features\n print(\"\\nPreprocessing features...\")\n processed_df, encoded_features = self.preprocess_features(df, target_column)\n \n # Separate target if provided\n if target_column and target_column in processed_df.columns:\n y = processed_df[target_column]\n X = processed_df.drop(columns=[target_column])\n else:\n y = None\n X = processed_df\n \n all_features = [X]\n feature_names_list = [X.columns.tolist()]\n \n # Generate polynomial features\n numerical_cols = X.select_dtypes(include=[np.number]).columns.tolist()\n if len(numerical_cols) >= 2:\n print(\"Generating polynomial features...\")\n poly_features, poly_names = self.generate_polynomial_features(\n X, numerical_cols[:5], degree=2 # Limit for efficiency\n )\n all_features.append(poly_features)\n feature_names_list.append(poly_names)\n \n # Generate clustering features\n if len(numerical_cols) >= 2:\n print(\"Generating clustering features...\")\n cluster_features, cluster_names = self.apply_clustering_features(X, n_clusters=3)\n all_features.append(cluster_features)\n feature_names_list.append(cluster_names)\n \n # Combine all features\n combined_features = pd.concat(all_features, axis=1)\n \n print(f\"\\nTotal features before selection: {combined_features.shape[1]}\")\n \n # Feature selection if target is provided\n if y is not None and combined_features.shape[1] > max_features:\n print(\"Selecting best features...\")\n selected_features, selected_names = self.select_best_features(\n combined_features, y, k=max_features, method='mutual_info'\n )\n else:\n selected_features = combined_features\n selected_names = combined_features.columns.tolist()\n \n # Dimensionality reduction\n if selected_features.shape[1] > 50:\n print(\"Applying dimensionality reduction...\")\n reduced_features, reduced_names = self.apply_dimensionality_reduction(\n selected_features, n_components=20\n )\n \n # Combine selected and reduced features\n final_features = pd.concat([selected_features, reduced_features], axis=1)\n else:\n final_features = selected_features\n \n print(f\"Final feature set: {final_features.shape[1]} features\")\n \n if y is not None:\n return final_features, y\n else:\n return final_features
# Example usage with Titanic dataset (simulated)
def create_sample_dataset():
\"\"\"Create sample dataset for demonstration\"\"\"\n np.random.seed(42)\n n_samples = 1000\n \n data = {\n 'age': np.random.normal(35, 12, n_samples),\n 'fare': np.random.lognormal(3, 1, n_samples),\n 'sex': np.random.choice(['male', 'female'], n_samples),\n 'class': np.random.choice(['1', '2', '3'], n_samples),\n 'embarked': np.random.choice(['S', 'C', 'Q'], n_samples, p=[0.7, 0.2, 0.1]),\n 'family_size': np.random.poisson(2, n_samples),\n 'cabin': [f'C{np.random.randint(1, 100)}' if np.random.random() > 0.7 else np.nan \n for _ in range(n_samples)]\n }\n \n # Create target variable with some logic\n survival_prob = (\n 0.7 * (data['sex'] == 'female').astype(int) +\n 0.3 * (np.array(data['class']) == '1').astype(int) +\n 0.2 * (np.array(data['age']) < 16).astype(int) +\n 0.1 * np.random.random(n_samples)\n )\n \n data['survived'] = (survival_prob > 0.5).astype(int)\n \n return pd.DataFrame(data)
# Demonstrate automated feature engineering\ndf = create_sample_dataset()\nprint(\"Original dataset:\")\nprint(df.info())\n\n# Apply automated feature engineering\nfeature_engineer = AutoFeatureEngineering()\nX_engineered, y = feature_engineer.create_comprehensive_features(\n df, target_column='survived', max_features=50\n)\n\nprint(f\"\\nEngineered features shape: {X_engineered.shape}\")\nprint(f\"Feature names: {X_engineered.columns.tolist()[:10]}...\") # Show first 10\n\n# Evaluate feature engineering impact\nfrom sklearn.model_selection import cross_val_score\nfrom sklearn.ensemble import RandomForestClassifier\n\n# Original features (basic preprocessing only)\nbasic_processed, _ = feature_engineer.preprocess_features(df, 'survived')\nX_basic = basic_processed.drop(columns=['survived'])\n\n# Compare model performance\nrf = RandomForestClassifier(n_estimators=100, random_state=42)\n\nscores_basic = cross_val_score(rf, X_basic, y, cv=5, scoring='roc_auc')\nscores_engineered = cross_val_score(rf, X_engineered, y, cv=5, scoring='roc_auc')\n\nprint(f\"\\nModel Performance Comparison (ROC AUC):\")\nprint(f\"Basic features: {scores_basic.mean():.3f} ± {scores_basic.std():.3f}\")\nprint(f\"Engineered features: {scores_engineered.mean():.3f} ± {scores_engineered.std():.3f}\")\nprint(f\"Improvement: {scores_engineered.mean() - scores_basic.mean():.3f}\")\n```\n\nThese applications demonstrate the versatility of unsupervised learning techniques across different domains. The key to successful implementation is understanding your data characteristics, business objectives, and selecting appropriate algorithms and parameters for your specific use case.\n\nEach application requires careful consideration of:\n- Data quality and preprocessing needs\n- Algorithm selection based on data characteristics\n- Parameter tuning and validation strategies\n- Interpretation and actionability of results\n- Integration with existing business processes\n\nUnsupervised learning provides powerful tools for discovering hidden patterns, reducing complexity, and enabling data-driven decision making across various industries and applications.