Unverified Commit 7fc9d59a authored by Claude's avatar Claude
Browse files

Add smart customer-based data generator with behavior simulation

parent ae9732af
Loading
Loading
Loading
Loading
+536 −0
Original line number Diff line number Diff line
"""
Smart Data Generator for Supply Chain Forecasting Educational App

Generates realistic supply chain data driven by actual customer behavior simulation.
Customers with different segments, purchase patterns, and preferences create
bottom-up demand that exhibits realistic time series properties (trends, seasonality).
"""

import numpy as np
import json
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
from dataclasses import dataclass


@dataclass
class Customer:
    """Represents an individual customer with purchase behavior"""
    id: int
    segment: str  # 'luxury', 'sport', 'casual'
    purchase_frequency: float  # Avg months between purchases
    brand_affinity: Dict[int, float]  # Preference for each watch model (0-1)
    price_sensitivity: float  # How much price affects decision (0-1)
    seasonality_factor: Dict[int, float]  # Month-specific purchase probability multipliers
    lifetime_value: float  # Expected total purchases

    def will_purchase_this_month(self, month: int, base_prob: float) -> bool:
        """Determine if customer will purchase this month"""
        # Base probability from purchase frequency
        monthly_prob = base_prob / self.purchase_frequency

        # Apply seasonality
        month_of_year = (month % 12) + 1
        seasonal_mult = self.seasonality_factor.get(month_of_year, 1.0)

        # Random decision
        return np.random.random() < (monthly_prob * seasonal_mult)

    def choose_watch(self, available_watches: List[Dict]) -> int:
        """Choose which watch to buy based on affinity"""
        # Calculate weighted probabilities
        probs = []
        watch_ids = []

        for watch in available_watches:
            watch_id = watch['id']
            affinity = self.brand_affinity.get(watch_id, 0.1)

            # Price sensitivity affects choice
            price_factor = 1.0 - (self.price_sensitivity * (watch['sell_price'] / 1000))
            price_factor = max(0.1, price_factor)

            probs.append(affinity * price_factor)
            watch_ids.append(watch_id)

        # Normalize probabilities
        probs = np.array(probs)
        probs = probs / probs.sum()

        # Choose watch
        return np.random.choice(watch_ids, p=probs)


class CustomerSegment:
    """Defines a customer segment with shared characteristics"""

    def __init__(self, name: str, size: int, config: Dict):
        self.name = name
        self.size = size
        self.config = config
        self.customers: List[Customer] = []

    def generate_customers(self, start_id: int) -> List[Customer]:
        """Generate customers for this segment"""
        customers = []

        for i in range(self.size):
            customer_id = start_id + i

            # Sample from segment distributions
            purchase_freq = np.random.normal(
                self.config['purchase_frequency_mean'],
                self.config['purchase_frequency_std']
            )
            purchase_freq = max(1.0, purchase_freq)  # At least once per year

            price_sensitivity = np.random.beta(
                self.config['price_sensitivity_alpha'],
                self.config['price_sensitivity_beta']
            )

            # Brand affinity (different customers prefer different watches)
            brand_affinity = {}
            for watch_id, affinity_params in self.config['brand_affinity'].items():
                brand_affinity[watch_id] = np.random.beta(
                    affinity_params['alpha'],
                    affinity_params['beta']
                )

            customer = Customer(
                id=customer_id,
                segment=self.name,
                purchase_frequency=purchase_freq,
                brand_affinity=brand_affinity,
                price_sensitivity=price_sensitivity,
                seasonality_factor=self.config['seasonality_factor'],
                lifetime_value=self.config['lifetime_value']
            )

            customers.append(customer)

        self.customers = customers
        return customers


class SmartSupplyChainDataGenerator:
    """Generates realistic supply chain data using customer behavior simulation"""

    def __init__(self, seed: int = 42):
        """
        Initialize the smart data generator

        Args:
            seed: Random seed for reproducibility
        """
        np.random.seed(seed)

        # Define the 3 watch models (same as original)
        self.watches = [
            {
                'id': 1,
                'name': 'Luxury Classic',
                'category': 'luxury',
                'base_cost': 150.0,
                'sell_price': 500.0,
                'base_demand': 80,
                'peak_months': [11, 12, 1]
            },
            {
                'id': 2,
                'name': 'Sport Pro',
                'category': 'sport',
                'base_cost': 80.0,
                'sell_price': 220.0,
                'base_demand': 150,
                'peak_months': [4, 5, 6, 7]
            },
            {
                'id': 3,
                'name': 'Casual Style',
                'category': 'casual',
                'base_cost': 40.0,
                'sell_price': 120.0,
                'base_demand': 200,
                'peak_months': [9, 10]
            }
        ]

        # Define customer segments
        self.segment_configs = {
            'luxury_buyers': {
                'size': 300,
                'purchase_frequency_mean': 18.0,  # Buy every 18 months
                'purchase_frequency_std': 6.0,
                'price_sensitivity_alpha': 2,
                'price_sensitivity_beta': 8,  # Less price sensitive
                'brand_affinity': {
                    1: {'alpha': 8, 'beta': 2},  # Strong preference for luxury
                    2: {'alpha': 3, 'beta': 7},
                    3: {'alpha': 2, 'beta': 8}
                },
                'seasonality_factor': {11: 1.5, 12: 1.8, 1: 1.3},  # Holiday boost
                'lifetime_value': 2000
            },
            'sport_enthusiasts': {
                'size': 500,
                'purchase_frequency_mean': 14.0,  # Buy every 14 months
                'purchase_frequency_std': 5.0,
                'price_sensitivity_alpha': 4,
                'price_sensitivity_beta': 6,
                'brand_affinity': {
                    1: {'alpha': 2, 'beta': 8},
                    2: {'alpha': 8, 'beta': 2},  # Strong preference for sport
                    3: {'alpha': 4, 'beta': 6}
                },
                'seasonality_factor': {4: 1.3, 5: 1.4, 6: 1.5, 7: 1.4},  # Spring/summer
                'lifetime_value': 800
            },
            'casual_shoppers': {
                'size': 800,
                'purchase_frequency_mean': 10.0,  # Buy every 10 months
                'purchase_frequency_std': 4.0,
                'price_sensitivity_alpha': 6,
                'price_sensitivity_beta': 4,  # More price sensitive
                'brand_affinity': {
                    1: {'alpha': 2, 'beta': 8},
                    2: {'alpha': 4, 'beta': 6},
                    3: {'alpha': 7, 'beta': 3}  # Strong preference for casual
                },
                'seasonality_factor': {9: 1.3, 10: 1.4},  # Back to school
                'lifetime_value': 500
            }
        }

        # Generate customer base
        self.customers = self._generate_customer_base()

        # Track customer growth over time (new customers join, some churn)
        self.customer_growth_rate = 0.005  # 0.5% monthly growth
        self.churn_rate = 0.003  # 0.3% monthly churn

    def _generate_customer_base(self) -> List[Customer]:
        """Generate initial customer base from segments"""
        all_customers = []
        current_id = 1

        for segment_name, config in self.segment_configs.items():
            segment = CustomerSegment(segment_name, config['size'], config)
            customers = segment.generate_customers(current_id)
            all_customers.extend(customers)
            current_id += len(customers)

        return all_customers

    def _simulate_monthly_purchases(self, month_idx: int,
                                    active_customers: List[Customer]) -> Dict[int, int]:
        """
        Simulate customer purchases for a given month

        Args:
            month_idx: Current month index
            active_customers: List of active customers

        Returns:
            Dictionary of {watch_id: purchase_count}
        """
        purchases = {watch['id']: 0 for watch in self.watches}

        # Apply trend - base probability increases over time
        trend_factor = 1.0 + (0.002 * month_idx)  # 0.2% monthly increase
        base_purchase_prob = 0.08 * trend_factor

        for customer in active_customers:
            if customer.will_purchase_this_month(month_idx, base_purchase_prob):
                watch_id = customer.choose_watch(self.watches)
                purchases[watch_id] += 1

        return purchases

    def _update_customer_base(self, month_idx: int) -> List[Customer]:
        """Update customer base with growth and churn"""
        # Remove churned customers
        active_customers = []
        for customer in self.customers:
            if np.random.random() > self.churn_rate:
                active_customers.append(customer)

        # Add new customers (maintaining segment proportions)
        new_customers_count = int(len(active_customers) * self.customer_growth_rate)

        if new_customers_count > 0:
            # Distribute new customers across segments
            segment_names = list(self.segment_configs.keys())
            segment_sizes = [self.segment_configs[s]['size'] for s in segment_names]
            total_size = sum(segment_sizes)
            segment_probs = [s / total_size for s in segment_sizes]

            next_id = max(c.id for c in active_customers) + 1

            for _ in range(new_customers_count):
                # Choose segment
                segment_name = np.random.choice(segment_names, p=segment_probs)
                config = self.segment_configs[segment_name]

                # Create new customer
                segment = CustomerSegment(segment_name, 1, config)
                new_customer = segment.generate_customers(next_id)[0]
                active_customers.append(new_customer)
                next_id += 1

        self.customers = active_customers
        return active_customers

    def _calculate_costs_and_revenue(self, watch: Dict, demand: int,
                                     production: int, inventory_start: int) -> Dict:
        """
        Calculate monthly costs and revenue based on production decisions

        Args:
            watch: Watch model configuration
            demand: Actual customer demand
            production: Units produced
            inventory_start: Starting inventory

        Returns:
            Dictionary with financial metrics
        """
        # Calculate what we can actually sell
        available_units = inventory_start + production
        units_sold = min(demand, available_units)

        # Calculate ending inventory
        inventory_end = available_units - units_sold

        # Revenue from sales
        revenue = units_sold * watch['sell_price']

        # Costs
        production_cost = production * watch['base_cost']
        labor_cost = production * 20.0
        holding_cost = inventory_end * watch['base_cost'] * 0.02

        # Stockout cost
        stockout_units = max(0, demand - units_sold)
        stockout_cost = stockout_units * watch['sell_price'] * 0.3

        total_costs = production_cost + labor_cost + holding_cost + stockout_cost
        profit = revenue - total_costs

        return {
            'demand': int(demand),
            'production': production,
            'inventory_start': inventory_start,
            'inventory_end': inventory_end,
            'units_sold': int(units_sold),
            'stockout_units': int(stockout_units),
            'revenue': round(revenue, 2),
            'production_cost': round(production_cost, 2),
            'labor_cost': round(labor_cost, 2),
            'holding_cost': round(holding_cost, 2),
            'stockout_cost': round(stockout_cost, 2),
            'total_costs': round(total_costs, 2),
            'profit': round(profit, 2)
        }

    def generate_dataset(self, years: int = 11) -> Dict:
        """
        Generate complete dataset for specified number of years

        Args:
            years: Number of years to generate

        Returns:
            Dictionary containing all historical data
        """
        total_months = years * 12
        start_date = datetime(2014, 1, 1)

        dataset = {
            'metadata': {
                'generated_date': datetime.now().isoformat(),
                'generator_type': 'smart_customer_simulation',
                'years': years,
                'total_months': total_months,
                'start_date': start_date.isoformat(),
                'initial_customers': len(self.customers),
                'watches': self.watches
            },
            'historical_data': []
        }

        # Track inventory for each watch
        inventory = {watch['id']: 100 for watch in self.watches}

        # Generate data month by month
        for month_idx in range(total_months):
            current_date = start_date + timedelta(days=30 * month_idx)
            year = (month_idx // 12) + 1
            month_in_year = (month_idx % 12) + 1

            # Update customer base (growth and churn)
            active_customers = self._update_customer_base(month_idx)

            # Simulate customer purchases
            purchases = self._simulate_monthly_purchases(month_idx, active_customers)

            month_data = {
                'month_index': month_idx,
                'year': year,
                'month': month_in_year,
                'date': current_date.strftime('%Y-%m'),
                'active_customers': len(active_customers),
                'watches': []
            }

            # Process each watch
            for watch in self.watches:
                watch_id = watch['id']
                demand = purchases[watch_id]

                # Production strategy: produce based on demand + safety stock
                production = int(demand * 1.05)

                # Calculate financials
                watch_data = self._calculate_costs_and_revenue(
                    watch, demand, production, inventory[watch_id]
                )

                # Update inventory for next month
                inventory[watch_id] = watch_data['inventory_end']

                # Add watch info
                watch_data['watch_id'] = watch['id']
                watch_data['watch_name'] = watch['name']

                month_data['watches'].append(watch_data)

            dataset['historical_data'].append(month_data)

        # Add final customer statistics
        dataset['metadata']['final_customers'] = len(self.customers)

        return dataset

    def save_dataset(self, dataset: Dict, filepath: str = 'supply_chain_data.json'):
        """Save dataset to JSON file"""
        with open(filepath, 'w') as f:
            json.dump(dataset, f, indent=2)
        print(f"Dataset saved to {filepath}")

    def get_training_data(self, dataset: Dict, training_years: int = 10) -> Dict:
        """Extract training data (first N years) from full dataset"""
        training_months = training_years * 12

        training_data = {
            'metadata': dataset['metadata'].copy(),
            'historical_data': dataset['historical_data'][:training_months]
        }
        training_data['metadata']['years'] = training_years
        training_data['metadata']['total_months'] = training_months
        training_data['metadata']['note'] = f"Training data: first {training_years} years"

        return training_data

    def get_test_data(self, dataset: Dict, test_year: int = 11) -> List[Dict]:
        """Extract test data (year to predict)"""
        start_idx = (test_year - 1) * 12
        end_idx = test_year * 12

        return dataset['historical_data'][start_idx:end_idx]


def main():
    """Generate and save the dataset"""
    print("=" * 60)
    print("Smart Supply Chain Dataset Generator")
    print("Customer Behavior Simulation")
    print("=" * 60)

    generator = SmartSupplyChainDataGenerator(seed=42)

    print(f"\nInitial Customer Base: {len(generator.customers)} customers")
    print("\nCustomer Segments:")
    for segment_name, config in generator.segment_configs.items():
        print(f"  - {segment_name}: {config['size']} customers")

    print("\n" + "-" * 60)
    print("Generating 11 years of data...")
    print("-" * 60)

    # Generate full 11-year dataset
    full_dataset = generator.generate_dataset(years=11)

    # Save full dataset
    generator.save_dataset(full_dataset, 'data/sim_supply_chain_data_full.json')

    # Save training data (10 years)
    training_data = generator.get_training_data(full_dataset, training_years=10)
    generator.save_dataset(training_data, 'data/sim_supply_chain_data_training.json')

    # Save test data (year 11)
    test_data = generator.get_test_data(full_dataset, test_year=11)
    with open('data/sim_supply_chain_data_test.json', 'w') as f:
        json.dump(test_data, f, indent=2)

    print("\n" + "=" * 60)
    print("Dataset Generation Complete!")
    print("=" * 60)
    print(f"Total months: {len(full_dataset['historical_data'])}")
    print(f"Training months: {len(training_data['historical_data'])}")
    print(f"Test months: {len(test_data)}")
    print(f"Final customer base: {full_dataset['metadata']['final_customers']} customers")

    # Print sample statistics
    print("\n" + "=" * 60)
    print("Sample Statistics:")
    print("=" * 60)

    # Year 1 stats
    print("\nYear 1:")
    year1_data = full_dataset['historical_data'][:12]
    for watch in full_dataset['metadata']['watches']:
        watch_id = watch['id']
        watch_name = watch['name']

        demands = [m['watches'][watch_id-1]['demand'] for m in year1_data]
        revenues = [m['watches'][watch_id-1]['revenue'] for m in year1_data]

        print(f"  {watch_name}:")
        print(f"    Avg Monthly Demand: {np.mean(demands):.1f} units")
        print(f"    Total Annual Demand: {np.sum(demands)} units")
        print(f"    Annual Revenue: CHF {np.sum(revenues):,.2f}")

    # Year 10 stats (showing growth)
    print("\nYear 10:")
    year10_data = full_dataset['historical_data'][108:120]
    for watch in full_dataset['metadata']['watches']:
        watch_id = watch['id']
        watch_name = watch['name']

        demands = [m['watches'][watch_id-1]['demand'] for m in year10_data]
        revenues = [m['watches'][watch_id-1]['revenue'] for m in year10_data]

        print(f"  {watch_name}:")
        print(f"    Avg Monthly Demand: {np.mean(demands):.1f} units")
        print(f"    Total Annual Demand: {np.sum(demands)} units")
        print(f"    Annual Revenue: CHF {np.sum(revenues):,.2f}")

    # Calculate growth rates
    print("\n" + "=" * 60)
    print("Growth Analysis (Year 1 → Year 10):")
    print("=" * 60)
    for watch in full_dataset['metadata']['watches']:
        watch_id = watch['id']
        watch_name = watch['name']

        y1_demand = sum([m['watches'][watch_id-1]['demand'] for m in year1_data])
        y10_demand = sum([m['watches'][watch_id-1]['demand'] for m in year10_data])

        growth = ((y10_demand - y1_demand) / y1_demand) * 100

        print(f"  {watch_name}: {growth:+.1f}% growth")


if __name__ == "__main__":
    main()