Unverified Commit 75454eed authored by Claude's avatar Claude
Browse files

Add sim_data_generator.py with gen_data() function

Created a new data generator that uses the full supply chain simulation
to generate data in the exact same format as forecast_app/data_generator.py.

Key features:
- gen_data(config) function for easy integration
- Uses complete supply chain simulation with all entities
- Outputs data in forecast_app compatible format
- Supports configurable parameters (seed, years, scenario, etc.)
- Aggregates simulation results into monthly watch-level metrics
- Includes all required fields: demand, production, inventory, costs, profit

The generator can be imported directly:
  from supply_chain_sim import gen_data
  dataset = gen_data(config)
parent 977c0cd5
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ from .distribution import RetailDistribution
from .customer_behavior import CustomerBehavior
from .model_update import ModelUpdate
from .research_logging import ResearchLogging
from .sim_data_generator import gen_data, SimDataGenerator

__version__ = "0.1.0"

@@ -24,5 +25,7 @@ __all__ = [
    "RetailDistribution",
    "CustomerBehavior",
    "ModelUpdate",
    "ResearchLogging"
    "ResearchLogging",
    "gen_data",
    "SimDataGenerator"
]
 No newline at end of file
+351 −0
Original line number Diff line number Diff line
"""
Simulation-based Data Generator for Supply Chain Forecasting

Uses the full supply chain simulation to generate realistic data in the same format
as the forecast_app data_generator.py output.
"""

import numpy as np
import json
from datetime import datetime, timedelta
from typing import Dict, List
from collections import defaultdict

from .simulation import SupplyChainSimulation
from .entities import Sale


class SimDataGenerator:
    """Generate data using the full supply chain simulation"""

    def __init__(self, seed: int = 42):
        """
        Initialize the simulation-based data generator

        Args:
            seed: Random seed for reproducibility
        """
        self.seed = seed
        np.random.seed(seed)

    def _aggregate_monthly_data(self, sim: SupplyChainSimulation, years: int) -> Dict:
        """
        Aggregate simulation results into monthly data format matching data_generator.py

        Args:
            sim: Completed simulation object
            years: Number of years simulated

        Returns:
            Dataset in the same format as forecast_app data_generator
        """
        total_months = years * 12
        start_date = datetime(2014, 1, 1)

        # Get watch models from simulation
        watches_info = []
        for watch in sim.setup.watches[:3]:  # Use first 3 watches to match data_generator
            watches_info.append({
                'id': watch.id + 1,  # 1-indexed for compatibility
                'name': watch.name,
                'category': watch.category,
                'base_cost': round(watch.base_cost, 2),
                'sell_price': round(watch.sell_price, 2),
                'base_demand': 100,  # Placeholder
                'seasonality_amplitude': 0.2,
                'trend': 0.005,
                'peak_months': [11, 12] if watch.category == 'luxury' else [6, 7]
            })

        # Build dataset structure
        dataset = {
            'metadata': {
                'generated_date': datetime.now().isoformat(),
                'years': years,
                'total_months': total_months,
                'start_date': start_date.isoformat(),
                'watches': watches_info,
                'source': 'supply_chain_simulation'
            },
            'historical_data': []
        }

        # Organize sales by month and watch
        sales_by_month = defaultdict(lambda: defaultdict(list))
        for sale in sim.all_sales:
            month_idx = sale.simulation_month - 1  # Convert to 0-indexed
            sales_by_month[month_idx][sale.watch_id].append(sale)

        # Track inventory for each watch (only first 3 watches)
        inventory_tracker = {watch['id'] - 1: 100 for watch in watches_info}  # Use 0-indexed watch_id

        # Generate monthly data
        for month_idx in range(total_months):
            current_date = start_date + timedelta(days=30 * month_idx)
            year = (month_idx // 12) + 1
            month_in_year = (month_idx % 12) + 1

            month_data = {
                'month_index': month_idx,
                'year': year,
                'month': month_in_year,
                'date': current_date.strftime('%Y-%m'),
                'watches': []
            }

            # Process each watch (only first 3)
            for watch_info in watches_info:
                watch_id = watch_info['id'] - 1  # Convert to 0-indexed

                # Get sales for this month and watch
                sales = sales_by_month[month_idx].get(watch_id, [])

                # Calculate metrics
                demand = len(sales)  # Total number of sales (demand satisfied)
                units_sold = sum(sale.quantity for sale in sales)
                revenue = sum(sale.total_price for sale in sales)

                # Estimate production (use actual units sold + small buffer)
                # In simulation, production happens before sales
                production = max(units_sold, int(units_sold * 1.1))

                # Calculate inventory
                inventory_start = inventory_tracker[watch_id]
                inventory_end = max(0, inventory_start + production - units_sold)
                inventory_tracker[watch_id] = inventory_end

                # Calculate stockouts (estimated as unmet demand)
                # This is an approximation since simulation tracks actual sales
                # We estimate based on inventory levels
                stockout_units = max(0, units_sold - inventory_start) if inventory_start < units_sold else 0

                # Calculate costs
                base_cost = watch_info['base_cost']
                sell_price = watch_info['sell_price']

                production_cost = production * base_cost
                labor_cost = production * 20.0  # Match data_generator labor cost
                holding_cost = inventory_end * base_cost * 0.02  # 2% holding cost
                stockout_cost = stockout_units * sell_price * 0.3  # 30% penalty

                total_costs = production_cost + labor_cost + holding_cost + stockout_cost
                profit = revenue - total_costs

                # Build watch data entry
                watch_data = {
                    'watch_id': watch_info['id'],
                    'watch_name': watch_info['name'],
                    'demand': int(demand) if demand > 0 else int(units_sold * np.random.uniform(0.8, 1.2)),
                    'production': production,
                    'inventory_start': inventory_start,
                    'inventory_end': inventory_end,
                    'units_sold': int(units_sold),
                    'stockout_units': int(stockout_units),
                    'revenue': round(revenue, 2),
                    'production_cost': round(production_cost, 2),
                    'labor_cost': round(labor_cost, 2),
                    'holding_cost': round(holding_cost, 2),
                    'stockout_cost': round(stockout_cost, 2),
                    'total_costs': round(total_costs, 2),
                    'profit': round(profit, 2)
                }

                month_data['watches'].append(watch_data)

            dataset['historical_data'].append(month_data)

        return dataset

    def generate_dataset(self, years: int = 11, config: Dict = None) -> Dict:
        """
        Generate complete dataset using simulation for specified number of years

        Args:
            years: Number of years to generate (default 11)
            config: Optional simulation configuration

        Returns:
            Dictionary containing all historical data in forecast_app format
        """
        # Prepare simulation config
        sim_config = config or {}
        sim_config.update({
            'seed': self.seed,
            'n_months': years * 12,
            'n_brands': 2,
            'n_suppliers': 5,
            'n_components': 15,
            'n_watches': 8,  # Generate 8 but use first 3
            'n_warehouses': 2,
            'n_retailers': 6,
            'n_customers': 500,
            'scenario': sim_config.get('scenario', 'basic')
        })

        print(f"Running supply chain simulation for {years} years ({years * 12} months)...")
        print("-" * 60)

        # Create and run simulation
        sim = SupplyChainSimulation(sim_config)

        # Run simulation without student predictions (uses automatic forecasting)
        sim.run_simulation_with_predictions(predictions=None)

        print("\n" + "-" * 60)
        print("Aggregating simulation data into forecast format...")

        # Aggregate data into required format
        dataset = self._aggregate_monthly_data(sim, years)

        print(f"✓ Generated dataset with {len(dataset['historical_data'])} months")

        return dataset

    def save_dataset(self, dataset: Dict, filepath: str = 'supply_chain_data.json'):
        """Save dataset to JSON file"""
        with open(filepath, 'w') as f:
            json.dump(dataset, f, indent=2)
        print(f"Dataset saved to {filepath}")

    def get_training_data(self, dataset: Dict, training_years: int = 10) -> Dict:
        """
        Extract training data (first N years) from full dataset

        Args:
            dataset: Full dataset
            training_years: Number of years to include in training set

        Returns:
            Dictionary with training data only
        """
        training_months = training_years * 12

        training_data = {
            'metadata': dataset['metadata'].copy(),
            'historical_data': dataset['historical_data'][:training_months]
        }
        training_data['metadata']['years'] = training_years
        training_data['metadata']['total_months'] = training_months
        training_data['metadata']['note'] = f"Training data: first {training_years} years"

        return training_data

    def get_test_data(self, dataset: Dict, test_year: int = 11) -> List[Dict]:
        """
        Extract test data (year to predict)

        Args:
            dataset: Full dataset
            test_year: Year number to extract (1-based)

        Returns:
            List of monthly data for the test year
        """
        start_idx = (test_year - 1) * 12
        end_idx = test_year * 12

        return dataset['historical_data'][start_idx:end_idx]


def gen_data(config: Dict = None) -> Dict:
    """
    Main function to generate simulation-based supply chain data

    Args:
        config: Configuration dictionary with optional parameters:
            - seed (int): Random seed for reproducibility (default: 42)
            - years (int): Number of years to simulate (default: 11)
            - scenario (str): Simulation scenario ('basic' or 'unplanned_problem')
            - n_customers (int): Number of customers (default: 500)
            - n_retailers (int): Number of retailers (default: 6)

    Returns:
        Dictionary with historical data in forecast_app format:
        {
            'metadata': {...},
            'historical_data': [
                {
                    'month_index': 0,
                    'year': 1,
                    'month': 1,
                    'date': '2014-01',
                    'watches': [
                        {
                            'watch_id': 1,
                            'watch_name': '...',
                            'demand': 100,
                            'production': 105,
                            'inventory_start': 100,
                            'inventory_end': 105,
                            'units_sold': 100,
                            'stockout_units': 0,
                            'revenue': 50000.0,
                            'production_cost': 15750.0,
                            'labor_cost': 2100.0,
                            'holding_cost': 315.0,
                            'stockout_cost': 0.0,
                            'total_costs': 18165.0,
                            'profit': 31835.0
                        },
                        ...
                    ]
                },
                ...
            ]
        }
    """
    config = config or {}
    seed = config.get('seed', 42)
    years = config.get('years', 11)

    generator = SimDataGenerator(seed=seed)
    dataset = generator.generate_dataset(years=years, config=config)

    return dataset


def main():
    """Example usage and testing"""
    print("=" * 60)
    print("SIMULATION-BASED DATA GENERATOR")
    print("=" * 60)
    print("\nGenerating supply chain data using full simulation...")

    # Generate dataset with default config
    config = {
        'seed': 42,
        'years': 2,  # Use 2 years for quick testing
        'scenario': 'basic',
        'n_customers': 500,
        'n_retailers': 6
    }

    dataset = gen_data(config)

    print("\n" + "=" * 60)
    print("DATASET SUMMARY")
    print("=" * 60)
    print(f"Total months: {len(dataset['historical_data'])}")
    print(f"Number of watch models: {len(dataset['metadata']['watches'])}")

    # Print sample from first month
    print("\n" + "=" * 60)
    print("SAMPLE DATA (Month 1)")
    print("=" * 60)
    first_month = dataset['historical_data'][0]
    print(f"Date: {first_month['date']}")
    for watch_data in first_month['watches']:
        print(f"\n{watch_data['watch_name']}:")
        print(f"  Demand: {watch_data['demand']} units")
        print(f"  Production: {watch_data['production']} units")
        print(f"  Units Sold: {watch_data['units_sold']} units")
        print(f"  Revenue: CHF {watch_data['revenue']:,.2f}")
        print(f"  Profit: CHF {watch_data['profit']:,.2f}")

    print("\n" + "=" * 60)
    print("✓ Data generation complete!")
    print("=" * 60)


if __name__ == "__main__":
    main()