diff --git a/examples/demos/logistics/crca_supply_shock_agent.py b/examples/demos/logistics/crca_supply_shock_agent.py new file mode 100644 index 00000000..3cee1163 --- /dev/null +++ b/examples/demos/logistics/crca_supply_shock_agent.py @@ -0,0 +1,1887 @@ +import math +from dataclasses import dataclass +from typing import Dict, List, Tuple, Optional, Any + +import numpy as np +import pandas as pd +from loguru import logger +import json +import re +import warnings + +from swarms import Agent +from swarms.agents import CRCAAgent + + +SUPPLY_AGENT_PROMPT = """ +You are an expert supply chain analyst with deep knowledge of multi-echelon inventory, logistics, and robust control. +You analyze telemetry and SCM outputs, apply causal reasoning, and propose safe, actionable interventions. + +Data-awareness: +- Consider lead time (L), survival/transport factor (phi), capacity (K), backlog (B), inventory (I), demand (D), orders (O), receipts (R), price (p). +- Reference learned causal edges when explaining recommendations (e.g., B→p+, I→p−, dcost→p+, L→R−, D→O+). + +Uncertainty and risk: +- Note regime shifts or drift (EWMA/CUSUM/BOCPD). Prefer conservative actions when uncertainty is high. +- Propose z_alpha updates, reroute/expedite shares with CVaR-style caution and execution realism. + +Guardrails: +- Avoid claims beyond the data window. Keep recommendations feasible under capacity and service constraints. +- Prefer stable signals and explain trade-offs (service vs. cost vs. bullwhip). + +Output structure (always include these 7 sections): +1) Drivers: succinct causal drivers (e.g., L↑→R↓→I↓→B↑→p↑; dcost↑→p↑→D↓) +2) Regime/Alerts: note EWMA/CUSUM/BOCPD, utilization, and stability +3) Proposal: recommended z_alpha, reroute/expedite, and rationale +4) Expected Impact: service, cost proxy, bullwhip changes (direction and rough magnitude) +5) Risks/Uncertainty: cite instability or wide uncertainty; suggest mitigations +6) Counterfactuals: 1–2 do()-style scenarios and expected KPI shifts +7) Actionables: concrete next steps and monitoring items + +Learned DAG alignment: +- Mirror the learned DAG strengths exactly in your explanation. Do not claim effects whose learned strength is ~0. If L→R≈0 or R→I≈0, reflect that and avoid relying on those edges. Base rationale on the actual learned edges provided. + +After the 7 sections, you MUST output a final JSON object on a new line with the exact schema: +{ + "proposal": {"z_alpha_new": number, "reroute_share": number, "expedite_share": number, "dual_source_share": number}, + "notes": string, + "expected_deltas": {"service": number, "cost_proxy": number, "bullwhip": number}, + "ci80": {"service": [low, high], "cost_proxy": [low, high], "bullwhip": [low, high]}, + "feasible": boolean +} +Numbers should be in [0, 3] for z_alpha_new, [0, 1] for shares. +""" + + +@dataclass +class SKUKey: + sku: str + facility: str + + +@dataclass +class PolicyParams: + z_alpha: float = 1.28 # base safety factor (approx 90%) + theta_smoothing: float = 0.35 # order smoothing parameter + + +@dataclass +class Elasticities: + eta_c: float = 0.3 # pass-through elasticity to price from cost changes + eta_B: float = 0.5 # price increases with backlog + eta_I: float = 0.3 # price decreases with inventory + + +class SupplyShockCRCAgent: + """CR-CA Supply-Shock Agent for multi-period inventory flows and interventions. + + Implements SCM flow, queueing-derived lead times, pricing pass-through, and + integrates CRCAAgent for causal analysis and do()-counterfactuals. + """ + + def __init__( + self, + skus: List[SKUKey], + T: int = 60, + policy: PolicyParams = PolicyParams(), + el: Elasticities = Elasticities(), + seed: int = 7, + ) -> None: + self.skus = skus + self.T = T + self.policy = policy + self.el = el + self.rng = np.random.default_rng(seed) + + # Core data containers (panel: period x (sku,facility)) + index = pd.MultiIndex.from_tuples( + [(t, k.sku, k.facility) for t in range(T) for k in skus], + names=["t", "sku", "facility"], + ) + self.df = pd.DataFrame(index=index) + + # Initialize states with simple priors + self.df["D"] = self.rng.poisson(100, len(self.df)) # demand + self.df["I"] = 120.0 # on-hand + self.df["B"] = 0.0 # backorder + self.df["P"] = 80.0 # pipeline + self.df["O"] = 0.0 # orders placed + self.df["R"] = 0.0 # receipts + self.df["K"] = 1e6 # facility capacity (big default) + self.df["phi"] = 1.0 # spoilage/survival fraction + self.df["L"] = self.rng.integers(1, 4, len(self.df)).astype(float) # lead time in periods (float for updates) + self.df["c"] = 10.0 # unit cost + self.df["dcost"] = 0.0 # cost change + self.df["p_bar"] = 15.0 + self.df["p"] = self.df["p_bar"] + self.df["D_ref"] = 100.0 + + # CR-CA causal layer + self.crca = CRCAAgent( + name="crca-supply-shock", + description="Causal layer for supply shocks and policy", + model_name="gpt-4o-mini", + max_loops=2, + ) + self._build_causal_graph() + + # VSM overlay components (S2-S5) + self.vsm = VSMOverlay() + + # Narrative LLM agent (graceful init) + try: + self.agent = Agent( + agent_name="CRCA-Supply-Shock-Agent", + system_prompt=SUPPLY_AGENT_PROMPT, + model_name="gpt-4o", + max_loops=1, + autosave=False, + dashboard=True, + verbose=False, + dynamic_temperature_enabled=True, + context_length=200000, + output_type="string", + streaming_on=False, + ) + except Exception as e: + logger.warning(f"LLM Agent init failed (narrative disabled): {e}") + self.agent = None + + # Feasibility config and last applied controls + self.lane_capacity: float = 0.35 # max share sum for reroute+expedite+dual_source per cycle + self.max_weekly_change: Dict[str, float] = {"z_alpha_new": 0.3, "reroute_share": 0.15, "expedite_share": 0.15, "dual_source_share": 0.2} + self.last_controls: Dict[str, float] = {"z_alpha_new": self.policy.z_alpha, "reroute_share": 0.0, "expedite_share": 0.0, "dual_source_share": 0.0} + # KPI normalization scalers + self.kpi_units = {"service": "%", "cost_proxy": "currency", "bullwhip": "ratio"} + self.cost_unit_multiplier = 1.0 + # KPI/action history and RL weights + self.kpi_history: List[Dict[str, float]] = [] + self.action_history: List[Dict[str, float]] = [] + self.mpc_weights: Dict[str, float] = {"service": 1.0, "cost": 1.0, "bullwhip": 0.5} + # Dynamic data feeds + self._feeds: Dict[str, Any] = {} + # Direct pricing lever (applied as an offset to p_bar during simulate) + self._price_adjust: float = 0.0 + + # ===== Helper: logistics lever support from learned DAG ===== + def _logistics_support_from_strengths(self, strengths: Dict[str, float], tol: float = 0.05) -> bool: + keys = ["L->R", "phi->R", "K->R"] + return any(abs(float(strengths.get(k, 0.0))) > tol for k in keys) + + # ===== Helper: calibrate z to target service via short grid search ===== + def calibrate_z_to_service(self, target_service: float = 0.95, z_grid: Optional[np.ndarray] = None) -> float: + if z_grid is None: + z_grid = np.linspace(0.8, 2.6, 10) + best_z = self.policy.z_alpha + best_val = float("inf") + # Snapshot current policy + z_prev = self.policy.z_alpha + try: + for z in z_grid: + self.policy.z_alpha = float(z) + df = self.simulate() + kpi = self.summarize(df) + # distance to target with slight cost penalty to avoid extreme z + val = abs(float(kpi.get("service_level", 0.0)) - target_service) + 0.01 * max(0.0, float(kpi.get("cost_proxy", 0.0))) + if val < best_val: + best_val = val + best_z = float(z) + finally: + self.policy.z_alpha = z_prev + return best_z + + # ===== Helper: minimal Pareto grid on (z, r, e) ===== + def pareto_front(self, base_z: float, allow_logistics: bool, trials: int = 30, allow_price: bool = False) -> List[Dict[str, Any]]: + z_vals = np.clip(np.linspace(base_z - 0.2, base_z + 0.2, 5), 0.5, 3.0) + r_vals = [0.0, 0.05, 0.1] if allow_logistics else [0.0] + e_vals = [0.0, 0.05, 0.1] if allow_logistics else [0.0] + p_vals = [0.0, -0.5, -1.0] if allow_price else [0.0] + points: List[Dict[str, Any]] = [] + for z in z_vals: + for r in r_vals: + for e in e_vals: + for p in p_vals: + imp = self._quantified_impact(float(z), float(r), float(e), trials=trials, price_adjust=float(p)) + exp = imp.get("expected", {}) + points.append({ + "z_alpha_new": float(z), + "reroute_share": float(r), + "expedite_share": float(e), + "price_adjust": float(p), + "expected": exp, + "ci80": imp.get("ci80", {}), + "cvar_loss": imp.get("cvar_loss", 0.0), + }) + # Pareto filter: maximize service, minimize cost and bullwhip + def dominates(a: Dict[str, Any], b: Dict[str, Any]) -> bool: + ea, eb = a["expected"], b["expected"] + svc_a, cost_a, bw_a = ea.get("service", 0.0), ea.get("cost_proxy", 0.0), ea.get("bullwhip", 0.0) + svc_b, cost_b, bw_b = eb.get("service", 0.0), eb.get("cost_proxy", 0.0), eb.get("bullwhip", 0.0) + return (svc_a >= svc_b and cost_a <= cost_b and bw_a <= bw_b) and (svc_a > svc_b or cost_a < cost_b or bw_a < bw_b) + pareto: List[Dict[str, Any]] = [] + for p in points: + if not any(dominates(q, p) for q in points if q is not p): + pareto.append(p) + return pareto + + def z_service_curve(self, z_values: Optional[np.ndarray] = None) -> List[Dict[str, float]]: + if z_values is None: + z_values = np.linspace(0.8, 2.6, 10) + curve: List[Dict[str, float]] = [] + z_prev = self.policy.z_alpha + try: + for z in z_values: + self.policy.z_alpha = float(z) + df = self.simulate() + k = self.summarize(df) + curve.append({"z": float(z), "service": float(k.get("service_level", 0.0)), "cost_proxy": float(k.get("cost_proxy", 0.0))}) + finally: + self.policy.z_alpha = z_prev + return curve + + # ===== Real-world KPI ingestion ===== + def ingest_kpis(self, kpis: Dict[str, float]) -> None: + """Ingest external KPIs (e.g., service %, cost, bullwhip) and store history.""" + safe = { + "service": float(kpis.get("service", np.nan)), + "cost_proxy": float(kpis.get("cost_proxy", np.nan)), + "bullwhip": float(kpis.get("bullwhip", np.nan)), + } + self.kpi_history.append(safe) + + # ===== Dynamic data feeds ===== + def register_feed(self, name: str, fetch_fn: Any) -> None: + """Register a callable that returns dicts to merge into df or KPIs.""" + self._feeds[name] = fetch_fn + + def poll_feeds(self) -> Dict[str, Any]: + """Poll all feeds and merge into state; return a snapshot of updates.""" + updates: Dict[str, Any] = {} + for name, fn in list(self._feeds.items()): + try: + data = fn() + updates[name] = data + # If KPI-like, ingest; else if dataframe-like keys, merge shallowly + if isinstance(data, dict) and set(["service", "cost_proxy", "bullwhip"]).issubset(set(data.keys())): + self.ingest_kpis(data) + # Extend here to merge time-series; keeping simple for now + except Exception as e: + updates[name] = {"error": str(e)} + return updates + + # ===== Action validation and rollback ===== + def validate_and_rollback( + self, + new_kpis: Dict[str, float], + thresholds: Dict[str, float] = None, + ) -> Dict[str, Any]: + """Validate last applied action against fresh KPIs; rollback if violated. + + thresholds: {"min_service_gain": 0.0, "max_cost_increase": 0.0, "max_bullwhip_increase": 0.0} + """ + if thresholds is None: + thresholds = {"min_service_gain": 0.0, "max_cost_increase": 0.0, "max_bullwhip_increase": 0.0} + if not self.kpi_history: + self.ingest_kpis(new_kpis) + return {"rolled_back": False, "reason": "no_baseline"} + prev = self.kpi_history[-1] + self.ingest_kpis(new_kpis) + ds = float(new_kpis.get("service", 0.0) - (prev.get("service", 0.0) if prev.get("service") is not None else 0.0)) + dc = float(new_kpis.get("cost_proxy", 0.0) - (prev.get("cost_proxy", 0.0) if prev.get("cost_proxy") is not None else 0.0)) + db = float(new_kpis.get("bullwhip", 0.0) - (prev.get("bullwhip", 0.0) if prev.get("bullwhip") is not None else 0.0)) + violate = (ds < thresholds["min_service_gain"]) or (dc > thresholds["max_cost_increase"]) or (db > thresholds["max_bullwhip_increase"]) + if violate and self.action_history: + # rollback: revert to previous controls, dampen risky knobs + last = self.action_history[-1] + self.policy.z_alpha = float(last.get("z_alpha_prev", self.policy.z_alpha)) + self.last_controls["z_alpha_new"] = self.policy.z_alpha + for k in ["reroute_share", "expedite_share", "dual_source_share"]: + self.last_controls[k] = max(0.0, self.last_controls.get(k, 0.0) * 0.5) + return {"rolled_back": True, "reason": "threshold_violation", "delta": {"service": ds, "cost": dc, "bullwhip": db}} + return {"rolled_back": False, "reason": "ok", "delta": {"service": ds, "cost": dc, "bullwhip": db}} + + # ===== Reinforcement-based self-tuning ===== + def reinforce_from_outcome(self, expected: Dict[str, float]) -> Dict[str, float]: + """Update MPC weights from outcome using a simple reward: r = dS - a*max(0,dC) - b*max(0,dB).""" + dS = float(expected.get("service", 0.0)) + dC = float(expected.get("cost_proxy", 0.0)) + dB = float(expected.get("bullwhip", 0.0)) + a, b, lr = 1.0, 0.5, 0.1 + reward = dS - a * max(0.0, dC) - b * max(0.0, dB) + # Increase emphasis on service if reward positive; else increase cost/bullwhip penalty + if reward >= 0: + self.mpc_weights["service"] = float(min(2.0, self.mpc_weights.get("service", 1.0) + lr * reward)) + self.mpc_weights["cost"] = float(max(0.2, self.mpc_weights.get("cost", 1.0) * (1.0 - 0.05))) + else: + self.mpc_weights["cost"] = float(min(2.0, self.mpc_weights.get("cost", 1.0) + lr * (-reward))) + self.mpc_weights["bullwhip"] = float(min(2.0, self.mpc_weights.get("bullwhip", 0.5) + lr * 0.5 * (-reward))) + return dict(self.mpc_weights) + + def _feasible(self, proposal: Dict[str, Any]) -> bool: + rr = float(proposal.get("reroute_share", 0.0)) + ex = float(proposal.get("expedite_share", 0.0)) + ds = float(proposal.get("dual_source_share", 0.0)) + if rr < 0 or ex < 0: + return False + if rr + ex + ds > self.lane_capacity: + return False + # rate limits + for k, cap in self.max_weekly_change.items(): + prev = float(self.last_controls.get(k, 0.0)) + cur = float(proposal.get(k, prev)) + if abs(cur - prev) > cap: + return False + return True + + def _quantified_impact(self, z_new: float, rr: float, ex: float, trials: int = 100, price_adjust: float = 0.0, alpha: float = 0.9) -> Dict[str, Any]: + # Run small Monte Carlo by injecting noise on D and L; measure KPI deltas + base = self.simulate() + base_kpi = self.summarize(base) + deltas = [] + rng = self.rng + saved_price_adj = self._price_adjust + for _ in range(trials): + # stochastic shocks via temporary tweaks + shock = {"stochastic": True} + self.policy.z_alpha = z_new + # approximate expedite/reroute: reduce L and increase phi within shares + self.df["phi"] = np.clip(self.df["phi"] * (1.0 + 0.1 * rr), 0.2, 1.2) + self.df["L"] = np.clip(self.df["L"] * (1.0 - 0.2 * ex), 1.0, None) + self._price_adjust = float(price_adjust) + sim = self.simulate(interventions=None) + kpi = self.summarize(sim) + deltas.append({ + "service": float(kpi["service_level"] - base_kpi["service_level"]), + "cost_proxy": float(kpi["cost_proxy"] - base_kpi["cost_proxy"]), + "bullwhip": float(kpi["bullwhip"] - base_kpi["bullwhip"]), + }) + # restore controls + self._price_adjust = saved_price_adj + # Aggregate + svc = np.array([d["service"] for d in deltas]) + cst = np.array([d["cost_proxy"] for d in deltas]) + bwe = np.array([d["bullwhip"] for d in deltas]) + def ci80(arr: np.ndarray) -> Tuple[float, float]: + return float(np.quantile(arr, 0.1)), float(np.quantile(arr, 0.9)) + def ci95(arr: np.ndarray) -> Tuple[float, float]: + return float(np.quantile(arr, 0.025)), float(np.quantile(arr, 0.975)) + loss = cst - svc + bwe + thr = float(np.quantile(loss, alpha)) + cvar = float(loss[loss >= thr].mean()) if np.any(loss >= thr) else float(loss.mean()) + return { + "expected": {"service": float(np.mean(svc)), "cost_proxy": float(np.mean(cst)), "bullwhip": float(np.mean(bwe))}, + "ci80": {"service": ci80(svc), "cost_proxy": ci80(cst), "bullwhip": ci80(bwe)}, + "ci95": {"service": ci95(svc), "cost_proxy": ci95(cst), "bullwhip": ci95(bwe)}, + "samples": {"service": svc.tolist(), "cost_proxy": cst.tolist(), "bullwhip": bwe.tolist()}, + "cvar_alpha": alpha, + "cvar_loss": cvar, + } + + # ===== CVaR grid minimizer (discrete neighborhood search) ===== + def cvar_select_from_grid( + self, + base: Dict[str, float], + alpha: float = 0.9, + eps: float = 0.05, + trials: int = 40, + weights: Optional[Dict[str, float]] = None, + ) -> Dict[str, Any]: + """Pick among a small grid around base (z,r,e,ds) to minimize CVaR_alpha of loss. + + loss = w_cost*Δcost - w_service*Δservice + w_bw*Δbullwhip + """ + if weights is None: + weights = {"cost": 1.0, "service": 1.0, "bullwhip": 0.5} + z0 = float(base.get("z_alpha_new", self.policy.z_alpha)) + r0 = float(base.get("reroute_share", 0.0)) + e0 = float(base.get("expedite_share", 0.0)) + ds0 = float(base.get("dual_source_share", 0.0)) + cand = [ + {"z": z0, "r": r0, "e": e0}, + {"z": z0+eps, "r": r0, "e": e0}, + {"z": z0, "r": r0+eps, "e": e0}, + {"z": z0, "r": r0, "e": e0+eps}, + {"z": z0-eps, "r": r0, "e": e0}, + ] + best = None + best_val = float("inf") + tail_q = alpha + for c in cand: + imp = self._quantified_impact(c["z"], c["r"], c["e"], trials=trials) + svc = np.array(imp["samples"]["service"]) # Δservice + cst = np.array(imp["samples"]["cost_proxy"]) # Δcost + bwe = np.array(imp["samples"]["bullwhip"]) # Δbullwhip + loss = weights["cost"] * cst - weights["service"] * svc + weights["bullwhip"] * bwe + thresh = np.quantile(loss, tail_q) + cvar = float(loss[loss >= thresh].mean()) if np.any(loss >= thresh) else float(loss.mean()) + if cvar < best_val: + best_val = cvar + best = {"z_alpha_new": c["z"], "reroute_share": c["r"], "expedite_share": c["e"], "dual_source_share": ds0} + return {"choice": best, "cvar": best_val, "alpha": alpha} + + @staticmethod + def _extract_final_json(text: str) -> Optional[Dict[str, Any]]: + try: + # Find the last JSON-like block with required keys + matches = re.findall(r"\{[\s\S]*?\}", text) + for chunk in reversed(matches): + if '"proposal"' in chunk and '"z_alpha_new"' in chunk: + return json.loads(chunk) + except Exception: + return None + return None + + @staticmethod + def _validate_proposal(p: Dict[str, Any]) -> Optional[Dict[str, float]]: + try: + z = float(np.clip(float(p.get("z_alpha_new")), 0.0, 3.0)) + rr = float(np.clip(float(p.get("reroute_share")), 0.0, 1.0)) + ex = float(np.clip(float(p.get("expedite_share")), 0.0, 1.0)) + return {"z_alpha_new": z, "reroute_share": rr, "expedite_share": ex} + except Exception: + return None + + def _llm_decision_loop( + self, + panel: pd.DataFrame, + s2: Dict[str, Any], + proposal: Optional[Dict[str, Any]], + decision: Dict[str, Any], + strengths: Dict[str, float], + kpis: Dict[str, Any], + rounds: int = 3, + ) -> Tuple[str, Optional[Dict[str, Any]]]: + if self.agent is None: + return "analysis_unavailable: agent_not_initialized", proposal + + alerts = s2.get("alerts", []) + ai_out: Any = "" + for r in range(rounds): + # Edge guidance for narration + top_edges = sorted(strengths.items(), key=lambda kv: abs(kv[1]), reverse=True)[:5] + weak_edges = [k for k, v in strengths.items() if abs(v) <= 0.05] + narrative_prompt = ( + f"Round {r+1}/{rounds}. Analyze state and propose safe actions.\n" + f"KPIs: service={kpis.get('service_level', 0.0):.2f}, cost_proxy={kpis.get('cost_proxy', 0.0):.2f}, bullwhip={kpis.get('bullwhip', 0.0):.2f}.\n" + f"Alerts: {alerts}.\n" + f"Current proposal: {proposal}. Decision: {decision}.\n" + f"Causal edges (use these): {dict(top_edges)}; avoid near-zero edges: {weak_edges}.\n" + f"Provide the 7-section output and the final JSON exactly per schema." + ) + try: + ai_out = self.agent.run(narrative_prompt) + except Exception as e: + return f"analysis_unavailable: {e}", proposal + llm_json = self._extract_final_json(str(ai_out)) if isinstance(ai_out, str) else None + if llm_json and isinstance(llm_json.get("proposal"), dict): + validated = self._validate_proposal(llm_json["proposal"]) + if validated: + proposal = {**validated, "source": "llm"} + decision = self.vsm.s5_policy_gate(proposal, s2, self.vsm.s3_star_audit(panel), risk_cap_cvar=0.15) + return str(ai_out), proposal + + def _build_causal_graph(self) -> None: + g = self.crca.causal_graph + g.clear() + vars_graph = [ + "L", # lead time + "phi", # survival/transport factor + "K", # capacity + "dcost", + "B", # backlog + "I", # inventory + "p", # price + "D", # demand + "O", # orders + "R", # receipts + ] + g.add_nodes_from(vars_graph) + # Structural influences (signs where sensible) + self.crca.add_causal_relationship("K", "R", strength=0.0) # more cap -> more receipts + self.crca.add_causal_relationship("phi", "R", strength=0.0) # more survival -> more receipts + self.crca.add_causal_relationship("L", "R", strength=0.0) # longer lead -> lower timely receipts + self.crca.edge_sign_constraints[("L", "R")] = -1 + + self.crca.add_causal_relationship("B", "p", strength=0.0) # backlog -> higher price + self.crca.edge_sign_constraints[("B", "p")] = 1 + self.crca.add_causal_relationship("I", "p", strength=0.0) # inventory -> lower price + self.crca.edge_sign_constraints[("I", "p")] = -1 + self.crca.add_causal_relationship("dcost", "p", strength=0.0) # cost pass-through + self.crca.edge_sign_constraints[("dcost", "p")] = 1 + + self.crca.add_causal_relationship("p", "D", strength=0.0) # pricing impacts demand + self.crca.edge_sign_constraints[("p", "D")] = -1 + self.crca.add_causal_relationship("D", "O", strength=0.0) # more demand -> more orders + self.crca.edge_sign_constraints[("D", "O")] = 1 + self.crca.add_causal_relationship("R", "I", strength=0.0) + self.crca.edge_sign_constraints[("R", "I")] = 1 + self.crca.add_causal_relationship("D", "B", strength=0.0) + self.crca.edge_sign_constraints[("D", "B")] = 1 + + @staticmethod + def _relu(x: float) -> float: + return float(max(0.0, x)) + + def _arrivals(self, O_hist: List[float], L_hist: List[int], phi_hist: List[float], t: int) -> float: + """Receipts at t: sum 1[L=ell]*O[t-ell]*phi[t-ell->t].""" + total = 0.0 + for ell in range(1, min(10, t + 1)): + if L_hist[t] == ell: + total += O_hist[t - ell] * phi_hist[t - ell] + return total + + def _queueing_leadtime(self, lam: float, mu: float, transport: float = 0.0) -> float: + rho = min(0.95, lam / max(mu, 1e-6)) + wq = (rho / (mu * max(1e-6, (1 - rho)))) if rho < 0.999 else 10.0 + return wq + (1.0 / max(mu, 1e-6)) + transport + + def _price_pass_through(self, p_bar: float, dcost: float, B: float, I: float, D_ref: float) -> float: + return ( + p_bar + + self.el.eta_c * dcost + + self.el.eta_B * (B / max(D_ref, 1e-6)) + - self.el.eta_I * (I / max(D_ref, 1e-6)) + ) + + def simulate(self, interventions: Optional[Dict[str, Any]] = None) -> pd.DataFrame: + """Run minimal multi-period SCM with optional do()-style interventions. + + interventions: dict with keys like {'outage_facility': {'F1': 1}, 'disruption_route': {'R1': 1}} + For simplicity, treat interventions as shocks to K (capacity), L (lead), and phi. + """ + df = self.df.copy() + # Initialize per (sku,facility) + for (sku, fac), sub in df.groupby(level=["sku", "facility" ]): + O = [0.0] * self.T + R = [0.0] * self.T + I = [float(sub.iloc[0]["I"])] + [0.0] * (self.T - 1) + B = [float(sub.iloc[0]["B"])] + [0.0] * (self.T - 1) + P = [float(sub.iloc[0]["P"])] + [0.0] * (self.T - 1) + L_hist = [int(sub.iloc[min(i, len(sub)-1)]["L"]) for i in range(self.T)] + phi_hist = [float(sub.iloc[min(i, len(sub)-1)]["phi"]) for i in range(self.T)] + K_hist = [float(sub.iloc[min(i, len(sub)-1)]["K"]) for i in range(self.T)] + + # Apply high-level interventions (if any) + for t in range(self.T): + if interventions: + if interventions.get("outage_facility", {}).get(fac, 0) == 1: + K_hist[t] = K_hist[t] * 0.7 # 30% capacity loss + if interventions.get("disruption_route", {}).get(fac, 0) == 1: + L_hist[t] = max(1, L_hist[t] + 1) + phi_hist[t] = max(0.2, phi_hist[t] * 0.8) + + # Period loop + for t in range(self.T): + # Demand and cost dynamics (toy): + D_t = float(sub.iloc[t]["D"]) if t < len(sub) else 100.0 + dcost_t = float(sub.iloc[t]["dcost"]) if t < len(sub) else 0.0 + pbar = (float(sub.iloc[t]["p_bar"]) if t < len(sub) else 15.0) + float(self._price_adjust) + + # Receipts from earlier orders + R[t] = self._arrivals(O, L_hist, phi_hist, t) + + # Shipments and inventory/backorders + S_t = min(I[t - 1] + (R[t] if t > 0 else 0.0), D_t + (B[t - 1] if t > 0 else 0.0)) + if t > 0: + I[t] = I[t - 1] + R[t] - S_t + B[t] = max(0.0, D_t + B[t - 1] - (I[t - 1] + R[t])) + + # Demand response to price + p_t = self._price_pass_through(pbar, dcost_t, B[t], I[t], float(sub.iloc[0]["D_ref"])) + # Simple log-linear elasticity around reference + D_t_eff = max(0.0, D_t * math.exp(-0.01 * (p_t - pbar))) + + # Lead-time estimate (M/M/1 rough cut) + lam = D_t_eff + mu = max(1e-6, K_hist[t] / max(1.0, len(self.skus))) + L_eff = max(1.0, self._queueing_leadtime(lam, mu, transport=float(L_hist[t]) - 1.0)) + + # Base-stock target and orders + POS_t = I[t] + P[t] + muL = L_eff + sigL = 0.5 * muL + target = muL * D_t_eff + self.policy.z_alpha * sigL * math.sqrt(max(1e-6, D_t_eff)) + O_policy = self._relu(target - POS_t) + # Order smoothing + O_prev = O[t - 1] if t > 0 else 0.0 + O[t] = (1 - self.policy.theta_smoothing) * O_prev + self.policy.theta_smoothing * O_policy + + # Update pipeline (very simplified) + if t < self.T - 1: + P[t + 1] = max(0.0, P[t] + O[t] - R[t]) + + # Capacity constraint (aggregate receipts) + R[t] = min(R[t], K_hist[t]) + + # Write back to df + df.loc[(t, sku, fac), ["R", "I", "B", "O", "p", "L", "phi", "K"]] = [ + R[t], I[t], B[t], O[t], p_t, L_eff, phi_hist[t], K_hist[t] + ] + + return df + + # ===== Reporting ===== + def summarize(self, df: pd.DataFrame) -> Dict[str, Any]: + out: Dict[str, Any] = {} + # Simple KPIs + service = 1.0 - (df["B"].groupby(level="t").mean() > 0).mean() + holding_cost = df["I"].clip(lower=0).mean() * 0.1 + shortage_penalty = df["B"].mean() * 1.0 + ordering_cost = df["O"].mean() * df["c"].mean() + cost = holding_cost + shortage_penalty + ordering_cost + bwe = float((df["O"].var() / max(df["D"].var(), 1e-6))) + out["service_level"] = float(service) + out["cost_proxy"] = float(cost) + out["bullwhip"] = bwe + return out + + # ===== Causal runs (Upgraded) ===== + def causal_edges(self, df: pd.DataFrame) -> Dict[str, float]: + """Fit causal edges and return strengths, now with enhanced analysis.""" + # Fit on panel averages per t + panel = df.groupby(level="t").mean(numeric_only=True) + vars_fit = [c for c in ["L", "phi", "K", "dcost", "B", "I", "p", "D", "O", "R"] if c in panel.columns] + try: + self.crca.fit_from_dataframe(panel, variables=vars_fit, window=min(30, len(panel)), decay_alpha=0.9, ridge_lambda=0.1) + except Exception as e: + logger.warning(f"CRCA fit skipped: {e}") + strengths = {} + for u, v in self.crca.causal_graph.edges(): + strengths[f"{u}->{v}"] = float(self.crca.causal_graph[u][v].get("strength", 0.0)) + return strengths + + def advanced_causal_analysis(self, df: pd.DataFrame) -> Dict[str, Any]: + """Run comprehensive advanced causal analysis using all upgraded CR-CA methods.""" + panel = df.groupby(level="t").mean(numeric_only=True) + vars_fit = [c for c in ["L", "phi", "K", "dcost", "B", "I", "p", "D", "O", "R"] if c in panel.columns] + + # Fit causal graph first + try: + self.crca.fit_from_dataframe(panel, variables=vars_fit, window=min(30, len(panel)), decay_alpha=0.9, ridge_lambda=0.1) + except Exception as e: + logger.warning(f"CRCA fit skipped: {e}") + return {"error": str(e)} + + # Get latest state for analysis + latest = panel.iloc[-1] if len(panel) > 0 else {} + factual_state = {var: float(latest.get(var, 0.0)) for var in vars_fit if var in latest} + + # ========== UPGRADED METHODS ========== + results: Dict[str, Any] = { + "causal_strengths": {}, + "sensitivity_analysis": {}, + "granger_causality": {}, + "information_theory": {}, + "var_model": {}, + "bayesian_edges": {}, + "root_causes": {}, + "shapley_attribution": {}, + "whatif_analysis": {}, + "optimal_intervention": {}, + "alternate_realities": {}, + } + + # Get causal strengths + for u, v in self.crca.causal_graph.edges(): + results["causal_strengths"][f"{u}->{v}"] = float(self.crca.causal_graph[u][v].get("strength", 0.0)) + + # 1. Sensitivity Analysis: What drives service/cost changes most? + try: + if len(factual_state) >= 4: + intervention_vars = [v for v in ["L", "phi", "K", "dcost", "B"] if v in factual_state][:4] + test_intervention = {k: factual_state.get(k, 0.0) for k in intervention_vars} + + # Analyze sensitivity to service (via B→p→D chain) and cost + sensitivity_service = self.crca.sensitivity_analysis( + intervention=test_intervention, + target="B", # Backlog affects service + perturbation_size=0.01, + ) + sensitivity_cost = self.crca.sensitivity_analysis( + intervention={k: v for k, v in test_intervention.items() if k in ["dcost", "L", "phi"]}, + target="p", # Price affects cost + perturbation_size=0.01, + ) + results["sensitivity_analysis"] = { + "service_drivers": sensitivity_service, + "cost_drivers": sensitivity_cost, + } + logger.info(f"Sensitivity: service={sensitivity_service.get('most_influential_variable', 'N/A')}, cost={sensitivity_cost.get('most_influential_variable', 'N/A')}") + except Exception as e: + logger.debug(f"Sensitivity analysis failed: {e}") + + # 2. Granger Causality: Temporal causal relationships + try: + if len(panel) >= 20: + # Test if demand Granger-causes orders + granger_d_o = self.crca.granger_causality_test( + df=panel, + var1="D", + var2="O", + max_lag=3, + ) + # Test if backlog Granger-causes price + granger_b_p = self.crca.granger_causality_test( + df=panel, + var1="B", + var2="p", + max_lag=2, + ) + results["granger_causality"] = { + "demand_granger_causes_orders": granger_d_o.get("granger_causes", False), + "backlog_granger_causes_price": granger_b_p.get("granger_causes", False), + "d_o_f_stat": granger_d_o.get("f_statistic", 0.0), + "b_p_f_stat": granger_b_p.get("f_statistic", 0.0), + } + logger.info(f"Granger causality: D→O={granger_d_o.get('granger_causes', False)}, B→p={granger_b_p.get('granger_causes', False)}") + except Exception as e: + logger.debug(f"Granger causality test failed: {e}") + + # 3. Information Theoretic Measures + try: + if len(panel) >= 10: + core_vars = [v for v in ["D", "O", "B", "I", "p", "L"] if v in panel.columns] + if len(core_vars) >= 3: + info_theory = self.crca.compute_information_theoretic_measures( + df=panel, + variables=core_vars, + ) + results["information_theory"] = info_theory + logger.info(f"Information theory: {len(info_theory.get('entropies', {}))} entropies computed") + except Exception as e: + logger.debug(f"Information theory computation failed: {e}") + + # 4. VAR Model: Vector Autoregression + try: + if len(panel) >= 30: + var_vars = [v for v in ["D", "O", "I", "B", "p"] if v in panel.columns] + if len(var_vars) >= 2: + var_model = self.crca.vector_autoregression_estimation( + df=panel, + variables=var_vars, + max_lag=2, + ) + results["var_model"] = var_model + logger.info(f"VAR model: {var_model.get('n_variables', 0)} variables, lag={var_model.get('max_lag', 0)}") + except Exception as e: + logger.debug(f"VAR estimation failed: {e}") + + # 5. Bayesian Edge Inference + try: + bayesian_edges = {} + key_edges = [("B", "p"), ("I", "p"), ("dcost", "p"), ("D", "O"), ("L", "R")] + for parent, child in key_edges: + if parent in panel.columns and child in panel.columns: + bayes_result = self.crca.bayesian_edge_inference( + df=panel, + parent=parent, + child=child, + prior_mu=0.0, + prior_sigma=1.0, + ) + if "error" not in bayes_result: + bayesian_edges[f"{parent}->{child}"] = { + "posterior_mean": bayes_result.get("posterior_mean", 0.0), + "posterior_std": bayes_result.get("posterior_std", 0.0), + "credible_interval": bayes_result.get("credible_interval_95", (0.0, 0.0)), + } + results["bayesian_edges"] = bayesian_edges + if bayesian_edges: + logger.info(f"Bayesian inference for {len(bayesian_edges)} edges") + except Exception as e: + logger.debug(f"Bayesian inference failed: {e}") + + # 6. Deep Root Cause Analysis: Find ultimate drivers of service/cost issues + try: + root_causes_service = self.crca.deep_root_cause_analysis( + problem_variable="B", # Backlog is service issue + max_depth=8, + min_path_strength=0.01, + ) + root_causes_cost = self.crca.deep_root_cause_analysis( + problem_variable="p", # Price is cost proxy + max_depth=8, + min_path_strength=0.01, + ) + results["root_causes"] = { + "service_issues": root_causes_service, + "cost_issues": root_causes_cost, + } + if root_causes_service.get("ultimate_root_causes"): + logger.info(f"Root causes (service): {[rc.get('root_cause') for rc in root_causes_service.get('ultimate_root_causes', [])[:3]]}") + except Exception as e: + logger.debug(f"Root cause analysis failed: {e}") + + # 7. Shapley Value Attribution: Fair attribution of KPI drivers + try: + if len(panel) >= 7: + # Baseline: average over last week + baseline_state = { + k: float(panel[k].tail(7).mean()) + for k in factual_state.keys() + if k in panel.columns + } + if baseline_state and "B" in baseline_state: + shapley_backlog = self.crca.shapley_value_attribution( + baseline_state=baseline_state, + target_state=factual_state, + target="B", + ) + if "p" in baseline_state: + shapley_price = self.crca.shapley_value_attribution( + baseline_state=baseline_state, + target_state=factual_state, + target="p", + ) + results["shapley_attribution"] = { + "backlog_drivers": shapley_backlog, + "price_drivers": shapley_price, + } + logger.info(f"Shapley attribution computed for B and p") + except Exception as e: + logger.debug(f"Shapley attribution failed: {e}") + + # 8. Multi-layer What-If Analysis: Cascading effects of disruptions + try: + test_scenarios = [ + {"L": factual_state.get("L", 2.0) * 1.5}, # Lead time disruption + {"phi": max(0.2, factual_state.get("phi", 1.0) * 0.7)}, # Survival rate drop + {"dcost": factual_state.get("dcost", 0.0) + 2.0}, # Cost shock + ] + whatif_analysis = self.crca.multi_layer_whatif_analysis( + scenarios=test_scenarios, + depth=3, + ) + results["whatif_analysis"] = whatif_analysis + logger.info(f"What-if analysis: {whatif_analysis.get('summary', {}).get('total_scenarios', 0)} scenarios") + except Exception as e: + logger.debug(f"What-if analysis failed: {e}") + + # 9. Optimal Intervention Sequence: Bellman optimization + try: + optimal_intervention = self.crca.bellman_optimal_intervention( + initial_state=factual_state, + target="B", # Minimize backlog (maximize service) + intervention_vars=["L", "phi", "K", "dcost"], + horizon=5, + discount=0.9, + ) + results["optimal_intervention"] = optimal_intervention + if optimal_intervention.get("optimal_sequence"): + logger.info(f"Optimal intervention sequence: {len(optimal_intervention['optimal_sequence'])} steps") + except Exception as e: + logger.debug(f"Optimal intervention failed: {e}") + + # 10. Explore Alternate Realities: Best intervention scenarios + try: + alternate_realities = self.crca.explore_alternate_realities( + factual_state=factual_state, + target_outcome="B", # Minimize backlog + target_value=0.0, # Target zero backlog + max_realities=30, + max_interventions=3, + ) + results["alternate_realities"] = alternate_realities + if alternate_realities.get("best_reality"): + improvement = factual_state.get("B", 0.0) - alternate_realities["best_reality"].get("target_value", 0.0) + logger.info(f"Best alternate reality: {improvement:+.2f} backlog reduction") + except Exception as e: + logger.debug(f"Alternate realities exploration failed: {e}") + + # 11. Cascading Chain Reaction Analysis + try: + if "L" in factual_state: + chain_reaction = self.crca.analyze_cascading_chain_reaction( + initial_intervention={"L": factual_state.get("L", 2.0) * 1.5}, + target_outcomes=["B", "I", "O", "p", "D"], + max_hops=6, + include_feedback_loops=True, + num_iterations=4, + ) + results["chain_reaction"] = chain_reaction + logger.info(f"Chain reaction analysis: {chain_reaction.get('summary', {}).get('total_paths_found', 0)} paths") + except Exception as e: + logger.debug(f"Chain reaction analysis failed: {e}") + + # 12. Cross-validation for edge strength reliability + try: + cv_results = {} + for parent, child in [("B", "p"), ("D", "O"), ("L", "R")]: + if parent in panel.columns and child in panel.columns: + cv = self.crca.cross_validate_edge_strength( + df=panel, + parent=parent, + child=child, + n_folds=5, + ) + if "error" not in cv: + cv_results[f"{parent}->{child}"] = { + "mean_cv_error": cv.get("mean_cv_error", 0.0), + "standard_error": cv.get("standard_error", 0.0), + } + results["cross_validation"] = cv_results + if cv_results: + logger.info(f"Cross-validation for {len(cv_results)} edges") + except Exception as e: + logger.debug(f"Cross-validation failed: {e}") + + return results + + def intervene_and_compare(self, scenario: Dict[str, Any]) -> Dict[str, Any]: + base = self.simulate() + base_kpi = self.summarize(base) + shock = self.simulate(interventions=scenario) + shock_kpi = self.summarize(shock) + delta = {k: float(shock_kpi.get(k, 0.0) - base_kpi.get(k, 0.0)) for k in base_kpi} + return {"base": base_kpi, "shock": shock_kpi, "delta": delta} + + # ===== VSM overlay orchestration ===== + def control_cycle( + self, + telemetry_events: Optional[List[Dict[str, Any]]] = None, + intel_events: Optional[List[Dict[str, Any]]] = None, + ) -> Dict[str, Any]: + """One control cycle over S2–S5: ingest telemetry, monitor, propose, audit, gate.""" + # Ingest telemetry/intel (stubs) + # Dynamic feeds first + _ = self.poll_feeds() + if telemetry_events: + self.vsm.ingest_s1_flow(self.df, telemetry_events) + if intel_events: + self.vsm.ingest_s4_external(intel_events) + + # Simulate baseline for the cycle + df = self.simulate() + panel = df.groupby(level="t").mean(numeric_only=True) + + # S2 stability monitors + s2 = self.vsm.s2_monitor(panel) + + # S3 propose optimizers only if S2 stable + proposal = None + if s2.get("stable", False): + proposal = self.vsm.s3_optimize(panel, self.policy) + # SPC/EWMA-driven mode switch: if EWMA breaches on backlog, bias to higher service target + backlog_alerts = [a for a in s2.get("alerts", []) if a.get("type") == "ewma" and a.get("signal") == "B"] + target_service = 0.95 + (0.02 if backlog_alerts else 0.0) + z_cal = self.calibrate_z_to_service(target_service=target_service) + proposal["z_alpha_new"] = float(z_cal) + + # S3★ audit + audit = self.vsm.s3_star_audit(panel) + + # S5 gate with policy caps (LLM-led proposals are still safety-gated here) + decision = self.vsm.s5_policy_gate(proposal, s2, audit, risk_cap_cvar=0.15) + + # Build causal strengths snapshot and LLM narratives + strengths = self.causal_edges(df) + # Build strength-aware explanation preface + top_edges = sorted(strengths.items(), key=lambda kv: abs(kv[1]), reverse=True)[:5] + explainer = {k: v for k, v in top_edges} + # Compute weak edges to avoid claiming + weak_edges = {k: v for k, v in strengths.items() if abs(v) <= 0.05} + # Prepare concise narrative prompt for the agent + kpis = self.summarize(df) + alerts = s2.get("alerts", []) + proposal_text = str(proposal) if proposal else "None" + decision_text = str(decision) + narrative_prompt = ( + f"Analyze supply chain state and propose safe actions.\n" + f"KPIs: service={kpis.get('service_level', 0.0):.2f}, cost_proxy={kpis.get('cost_proxy', 0.0):.2f}, bullwhip={kpis.get('bullwhip', 0.0):.2f}.\n" + f"Alerts: {alerts}.\n" + f"Proposal: {proposal_text}. Decision: {decision_text}.\n" + f"Causal edges (mirror these in explanation): {explainer}.\n" + f"Do NOT claim effects via edges with near-zero strength: {list(weak_edges.keys())}.\n" + f"Provide the 7-section output per instructions." + ) + # Multi-round LLM-led decision loop + ai_analysis, proposal = self._llm_decision_loop( + panel=panel, + s2=s2, + proposal=proposal, + decision=decision, + strengths=strengths, + kpis=kpis, + rounds=3, + ) + + # Feasibility check and quantified impact + feasibility_flag = False + impact = None + if isinstance(proposal, dict): + # Gate logistics levers by learned DAG support + allow_logistics = self._logistics_support_from_strengths(strengths, tol=0.05) + allow_price = (abs(float(strengths.get("B->p", 0.0))) > 0.05) and (abs(float(strengths.get("p->D", 0.0))) > 0.05) + if not allow_logistics: + proposal["reroute_share"] = 0.0 + proposal["expedite_share"] = 0.0 + if not allow_price: + proposal["price_adjust"] = 0.0 + else: + proposal.setdefault("price_adjust", 0.0) + feasibility_flag = self._feasible(proposal) + z_new = float(proposal.get("z_alpha_new", self.policy.z_alpha)) + rr = float(proposal.get("reroute_share", 0.0)) + ex = float(proposal.get("expedite_share", 0.0)) + price_adj = float(proposal.get("price_adjust", 0.0)) + impact = self._quantified_impact(z_new, rr, ex, trials=50, price_adjust=price_adj) + # auto de-risk actions on red tier or excessive cost + if s2.get("tier") == "red" or (impact and impact.get("expected", {}).get("cost_proxy", 0.0) > 0): + # freeze increases to expedite/reroute + proposal["expedite_share"] = min(float(proposal.get("expedite_share", 0.0)), self.last_controls.get("expedite_share", 0.0)) + proposal["reroute_share"] = min(float(proposal.get("reroute_share", 0.0)), self.last_controls.get("reroute_share", 0.0)) + # Optional: refine via constrained MPC if feasible + if feasibility_flag: + mpc = constrained_mpc_policy( + last=self.last_controls, + limits=self.max_weekly_change, + lane_cap=self.lane_capacity, + weights=self.mpc_weights, + impact=impact, + ) + if "error" not in mpc: + proposal.update({k: v for k, v in mpc.items() if k in ["z_alpha_new", "reroute_share", "expedite_share", "dual_source_share"]}) + # Final hard-feasibility polish via CP-SAT + cps = cpsat_policy(self.last_controls, self.max_weekly_change, self.lane_capacity) + if "error" not in cps: + proposal.update({k: v for k, v in cps.items() if k in ["z_alpha_new", "reroute_share", "expedite_share", "dual_source_share"]}) + # CVaR neighborhood selection + cvar_pick = self.cvar_select_from_grid(proposal, alpha=0.9, eps=0.05, trials=30, weights=self.mpc_weights) + if cvar_pick.get("choice"): + proposal.update(cvar_pick["choice"]) + + # UPGRADED: Try gradient-based optimization for refinement + try: + # Use current state as initial for gradient optimization + current_state = { + "z_alpha": z_new, + "reroute_share": rr, + "expedite_share": ex, + "B": float(panel.get("B", pd.Series([0.0])).iloc[-1]), + "I": float(panel.get("I", pd.Series([120.0])).iloc[-1]), + "p": float(panel.get("p", pd.Series([15.0])).iloc[-1]), + } + + # Optimize z_alpha to minimize backlog (via gradient) + if "B" in panel.columns: + opt_result = self.crca.gradient_based_intervention_optimization( + initial_state=current_state, + target="B", # Minimize backlog + intervention_vars=["z_alpha"], + constraints={"z_alpha": (0.5, 3.0)}, + method="L-BFGS-B", + ) + if opt_result.get("success") and opt_result.get("optimal_intervention"): + # Refine z_alpha if gradient optimization suggests improvement + opt_z = opt_result["optimal_intervention"].get("z_alpha", z_new) + if abs(opt_z - z_new) < 0.3: # Within rate limit + proposal["z_alpha_new"] = float(opt_z) + logger.info(f"Gradient optimization refined z_alpha: {z_new:.2f} → {opt_z:.2f}") + except Exception as e: + logger.debug(f"Gradient optimization failed: {e}") + + # Provide minimal Pareto frontier for transparency + pareto = self.pareto_front(base_z=z_new, allow_logistics=allow_logistics, trials=20, allow_price=allow_price) + else: + pareto = [] + else: + # Infeasible: rollback immediately + if self.action_history: + last = self.action_history[-1] + self.policy.z_alpha = float(last.get("z_alpha_prev", self.policy.z_alpha)) + self.last_controls["z_alpha_new"] = self.policy.z_alpha + for k in ["reroute_share", "expedite_share", "dual_source_share"]: + self.last_controls[k] = max(0.0, self.last_controls.get(k, 0.0) * 0.5) + pareto = [] + + # Apply approved proposal to live policy (LLM primacy) after safety gate + if isinstance(proposal, dict) and feasibility_flag: + decision = self.vsm.s5_policy_gate(proposal, s2, audit, risk_cap_cvar=0.15) + if decision.get("approved"): + self.policy.z_alpha = float(proposal.get("z_alpha_new", self.policy.z_alpha)) + # persist last controls for rate-limit checks + self.last_controls = { + "z_alpha_new": self.policy.z_alpha, + "reroute_share": float(proposal.get("reroute_share", 0.0)), + "expedite_share": float(proposal.get("expedite_share", 0.0)), + } + # record action for possible rollback and RL + self.action_history.append({ + "z_alpha_prev": float(self.last_controls["z_alpha_new"]), + "reroute_share": float(self.last_controls["reroute_share"]), + "expedite_share": float(self.last_controls["expedite_share"]), + }) + if impact and impact.get("expected"): + self.reinforce_from_outcome(impact["expected"]) + + # Advanced causal analysis using upgraded methods + advanced_causal = {} + try: + advanced_causal = self.advanced_causal_analysis(df) + logger.info(f"Advanced causal analysis completed: {len([k for k, v in advanced_causal.items() if v and 'error' not in str(v)])} methods succeeded") + except Exception as e: + logger.debug(f"Advanced causal analysis failed: {e}") + + # Human-readable causal summary via CR-CA agent (enhanced with upgraded insights) + try: + sensitivity_note = "" + if advanced_causal.get("sensitivity_analysis"): + sens_svc = advanced_causal["sensitivity_analysis"].get("service_drivers", {}) + sens_cost = advanced_causal["sensitivity_analysis"].get("cost_drivers", {}) + most_influential_svc = sens_svc.get("most_influential_variable", "N/A") + most_influential_cost = sens_cost.get("most_influential_variable", "N/A") + sensitivity_note = f" Sensitivity analysis shows {most_influential_svc} most drives service, {most_influential_cost} most drives cost." + + granger_note = "" + if advanced_causal.get("granger_causality"): + gc = advanced_causal["granger_causality"] + if gc.get("demand_granger_causes_orders"): + granger_note = " Granger causality: D→O confirmed. " + if gc.get("backlog_granger_causes_price"): + granger_note += "B→p temporally confirmed." + + summary_prompt = ( + f"Summarize key drivers (B→p, I→p, dcost→p, L→R) and their implications.{sensitivity_note}{granger_note} " + f"Reference learned strengths: {dict(list(strengths.items())[:5])}." + ) + causal_summary = self.crca.run(summary_prompt) + except Exception as e: + causal_summary = f"causal_unavailable: {e}" + + return { + "s2": s2, + "proposal": proposal, + "audit": audit, + "decision": decision, + "kpis": kpis, + "causal_strengths": strengths, + "ai_analysis": ai_analysis, + "causal_summary": causal_summary, + "feasible": feasibility_flag, + "impact": impact, + "pareto_front": pareto, + "z_service_curve": self.z_service_curve(), + # Upgraded CR-CA analysis results + "advanced_causal": advanced_causal, + # Advanced hooks (stubs returning diagnostics) + "estimation": { + "ms_filter": ms_regime_filter(panel), + "bsts_nowcast": bsts_nowcast(panel), + "svar": svar_identification(panel), + }, + "robust_optimization": { + "dro_mpc": dro_mpc_plan(panel), + "chance_mpc": chance_constrained_mpc(panel), + "h_infinity": h_infinity_controller(panel), + "sddp": sddp_policy_stub(panel), + }, + "multi_echelon": { + "clark_scarf": clark_scarf_baseline(panel), + "risk_newsvendor": risk_averse_newsvendor_stub(panel), + }, + "network_risk": { + "percolation": percolation_stub(panel), + "eisenberg_noe": eisenberg_noe_stub(panel), + "k_cuts": k_cut_sets_stub(), + }, + "pricing": { + "logit": logit_share_stub(panel), + "dp_pricing": dynamic_pricing_dp_stub(panel), + "ramsey": ramsey_pricing_stub(panel), + }, + "policy_eval": { + "synth_control": synthetic_control_stub(panel), + "did_iv": did_iv_stub(panel), + }, + "security": { + "secure_state": secure_state_l1_stub(panel), + "spectral": spectral_shift_stub(panel), + }, + "investment": { + "real_options": real_options_stub(), + "supplier_portfolio": supplier_portfolio_stub(), + }, + "advanced2": { + "transport_modes": transport_modes_sim_stub(), + "online_em_ms": online_em_ms(panel), + "hier_bsts": hierarchical_bsts_stub(panel), + "multi_stage_mpc": multi_stage_mpc_stub(panel), + "iot_fusion": iot_fusion_stub(), + "nested_logit": nested_logit_stub(), + "kcut_hardening": kcut_hardening_stub(), + "linucb_policy": linucb_policy_stub(panel), + "l1_observer": l1_observer_stub(panel), + "fan_charts": fan_charts_stub(panel), + "async_ingest": {"kafka": True, "duckdb": True} + } + } + + +# ===== Viable System Overlay (S2–S5) ===== +class VSMOverlay: + def __init__(self) -> None: + self.state: Dict[str, Any] = {"R_posterior": np.array([1.0])} + + # Telemetry ingest (stubs) + def ingest_s1_flow(self, df: pd.DataFrame, events: List[Dict[str, Any]]) -> None: + # Example event schema documented; here we could append/merge into df + # For now we no-op (upgrade path to Kafka/Timescale later) + pass + + def ingest_s4_external(self, events: List[Dict[str, Any]]) -> None: + # Store the latest intel snapshot + self.state["intel_last"] = events[-1] if events else None + + # S2: Stability/coordination + def s2_monitor(self, panel: pd.DataFrame) -> Dict[str, Any]: + out: Dict[str, Any] = {"alerts": [], "stable": True, "tier": "green"} + # EWMA residuals on key KPIs + for col in [c for c in ["L", "I", "B"] if c in panel.columns]: + z, breaches = ewma_monitor(panel[col].values, lam=0.2, k=3.0) + if breaches > 0: + out["alerts"].append({"type": "ewma", "signal": col, "breaches": breaches}) + # CUSUM on lead time + if "L" in panel.columns: + plus, minus, trips = cusum(panel["L"].values, mu0=float(panel["L"].head(5).mean()), k=0.1, h=5.0) + if trips: + out["alerts"].append({"type": "cusum", "signal": "L", "trips": len(trips)}) + # Page-Hinkley on demand + if "D" in panel.columns and len(panel["D"]) > 20: + ph = page_hinkley(panel["D"].astype(float).values) + if ph["alarm"]: + out["alerts"].append({"type": "page_hinkley", "signal": "D", "mT": ph["mT"]}) + # BOCPD change-point minimal + if "D" in panel.columns: + p_break = bocpd_break_prob(panel["D"].values, hazard=0.02) + if p_break > 0.5: + out["alerts"].append({"type": "bocpd", "p_break": p_break}) + # Queueing sanity + if set(["D", "K"]).issubset(panel.columns): + lam = float(max(1e-6, panel["D"].iloc[-1])) + mu = float(max(1e-6, panel["K"].iloc[-1])) + rho = lam / mu + if rho > 0.85: + out["alerts"].append({"type": "utilization", "rho": rho}) + # Stable if few alerts; tiering + n_alerts = len(out["alerts"]) + out["stable"] = n_alerts == 0 + if n_alerts == 0: + out["tier"] = "green" + elif n_alerts < 3: + out["tier"] = "yellow" + else: + out["tier"] = "red" + return out + + # S3: Optimizers (stubs) + def s3_optimize(self, panel: pd.DataFrame, policy: PolicyParams) -> Dict[str, Any]: + # Heuristic constrained optimizer proxy; prefer stability + z = policy.z_alpha + rr = 0.0 + ex = 0.05 + ds = 0.0 + if "B" in panel.columns: + b_trend = float(np.polyfit(np.arange(len(panel)), panel["B"].values, 1)[0]) if len(panel) >= 5 else 0.0 + z = float(np.clip(z + 0.05 * np.sign(b_trend), 0.5, 2.5)) + if self.state.get("intel_last"): + rr = 0.1 + return {"z_alpha_new": z, "reroute_share": rr, "expedite_share": ex, "dual_source_share": ds} + + # S3★: Audit + def s3_star_audit(self, panel: pd.DataFrame) -> Dict[str, Any]: + out: Dict[str, Any] = {} + if set(["O", "D"]).issubset(panel.columns): + varO = float(panel["O"].var()) + varD = float(panel["D"].var()) + out["BWE"] = float(varO / max(varD, 1e-6)) + # Simple receipts cross-check stub (noisy) + out["receipts_delta_sigma"] = 0.0 + return out + + # S5: Policy gate + def s5_policy_gate( + self, + proposal: Optional[Dict[str, Any]], + s2: Dict[str, Any], + audit: Dict[str, Any], + risk_cap_cvar: float = 0.2, + ) -> Dict[str, Any]: + if proposal is None: + return {"approved": False, "reason": "no_proposal"} + if not s2.get("stable", False): + return {"approved": False, "reason": "unstable_S2"} + if audit.get("BWE", 1.0) > 1.3: + # tighten smoothing recommendation by halving expedite/reroute + proposal = {**proposal, "expedite_share": proposal.get("expedite_share", 0.0) * 0.5, "reroute_share": proposal.get("reroute_share", 0.0) * 0.5} + return {"approved": True, "proposal": proposal} + + +# ===== Detectors (S2) ===== +def ewma_monitor(x: np.ndarray, lam: float = 0.2, k: float = 3.0) -> Tuple[np.ndarray, int]: + z = np.zeros_like(x, dtype=float) + z[0] = x[0] + for t in range(1, len(x)): + z[t] = lam * x[t] + (1 - lam) * z[t - 1] + resid = x - z + sigma = np.std(resid[max(1, len(resid)//10):]) or 1.0 + breaches = int(np.sum(np.abs(resid) > k * sigma)) + return z, breaches + + +def cusum(x: np.ndarray, mu0: float, k: float, h: float) -> Tuple[np.ndarray, np.ndarray, List[int]]: + s_plus = np.zeros_like(x, dtype=float) + s_minus = np.zeros_like(x, dtype=float) + trips: List[int] = [] + for t in range(1, len(x)): + s_plus[t] = max(0.0, s_plus[t - 1] + (x[t] - mu0 - k)) + s_minus[t] = max(0.0, s_minus[t - 1] + (mu0 - x[t] - k)) + if s_plus[t] > h or s_minus[t] > h: + trips.append(t) + s_plus[t] = 0.0 + s_minus[t] = 0.0 + return s_plus, s_minus, trips + + +def bocpd_break_prob(x: np.ndarray, hazard: float = 0.02) -> float: + # Minimal BOCPD: approximate break prob by normalized absolute diff of recent means with hazard weight + if len(x) < 10: + return 0.0 + n = len(x) + m1 = float(np.mean(x[: n // 2])) + m2 = float(np.mean(x[n // 2 :])) + delta = abs(m2 - m1) / (np.std(x) or 1.0) + p = 1.0 - math.exp(-hazard * delta) + return float(np.clip(p, 0.0, 1.0)) + + +def page_hinkley(x: np.ndarray, delta: float = 0.005, lamb: float = 50.0, alpha: float = 1.0) -> Dict[str, Any]: + # Minimal Page-Hinkley drift detector + mean = 0.0 + mT = 0.0 + MT = 0.0 + alarm = False + for i, xi in enumerate(x): + mean = mean + (xi - mean) / (i + 1) + mT = mT + xi - mean - delta + MT = min(MT, mT) + if mT - MT > lamb * alpha: + alarm = True + break + return {"alarm": alarm, "mT": float(mT)} + + +# ===== Advanced Estimation (stubs) ===== +def ms_regime_filter(panel: pd.DataFrame) -> Dict[str, Any]: + """Two-regime Markov-switching on demand using statsmodels MarkovRegression. + + Returns current regime probabilities and smoothed prediction for next step. + """ + try: + from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression + if "D" not in panel.columns or len(panel) < 20: + return {"regime": "insufficient", "p_regime": {}} + y = panel["D"].astype(float).values + # Fit a simple mean-switching model with 2 regimes + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + mod = MarkovRegression(y, k_regimes=2, trend='c', switching_variance=True) + res = mod.fit(disp=False) + p_t = res.smoothed_marginal_probabilities.values[:, -1] if hasattr(res.smoothed_marginal_probabilities, 'values') else res.smoothed_marginal_probabilities + p_reg1 = float(p_t[-1]) + regime = "shock" if p_reg1 > 0.5 else "normal" + return {"regime": regime, "p_regime": {"regime1": p_reg1, "regime0": 1 - p_reg1}} + except Exception as e: + return {"regime": "error", "error": str(e)} + + +def bsts_nowcast(panel: pd.DataFrame) -> Dict[str, Any]: + """Dynamic factor nowcast for demand using statsmodels DynamicFactor (FAVAR-like).""" + try: + from statsmodels.tsa.statespace.dynamic_factor import DynamicFactor + cols = [c for c in ["D", "R", "I", "B", "O"] if c in panel.columns] + if len(cols) < 2 or len(panel) < 20: + mu = float(panel.get("D", pd.Series([100.0])).iloc[-1]) + return {"D_nowcast": mu, "uncertainty": 0.2, "cols": cols} + endog = panel[cols].astype(float) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + mod = DynamicFactor(endog, k_factors=1, factor_order=1) + res = mod.fit(disp=False) + # One-step ahead forecast for demand + fc = res.get_forecast(steps=1) + d_idx = cols.index("D") + d_mean = float(fc.predicted_mean.iloc[0, d_idx]) + d_var = float(fc.var_pred_mean.iloc[0, d_idx]) if hasattr(fc, 'var_pred_mean') else 0.2 + return {"D_nowcast": d_mean, "uncertainty": max(0.05, min(0.5, d_var ** 0.5)), "cols": cols} + except Exception as e: + mu = float(panel.get("D", pd.Series([100.0])).iloc[-1]) + return {"D_nowcast": mu, "uncertainty": 0.3, "error": str(e)} + + +def svar_identification(panel: pd.DataFrame) -> Dict[str, Any]: + """Estimate a small VAR and report impulse responses as proxy for shocks.""" + try: + from statsmodels.tsa.api import VAR + cols = [c for c in ["D", "R", "I", "B", "p"] if c in panel.columns] + if len(cols) < 3 or len(panel) < 30: + return {"irf": {}, "cols": cols} + endog = panel[cols].astype(float) + model = VAR(endog) + res = model.fit(maxlags=2, ic='aic') + irf = res.irf(5) + irf_dict = {f"{src}->{dst}": float(irf.irfs[1, cols.index(src), cols.index(dst)]) for src in cols for dst in cols} + return {"irf_h1": irf_dict, "cols": cols} + except Exception as e: + return {"error": str(e)} + + +# ===== Robust Optimization & Control (stubs) ===== +def dro_mpc_plan(panel: pd.DataFrame) -> Dict[str, Any]: + """Single-variable DRO-style MPC for z_alpha using cvxpy. + + Minimize holding/backorder proxy under Wasserstein penalty. + """ + try: + import cvxpy as cp + from scipy.stats import norm + D = float(panel.get("D", pd.Series([100.0])).iloc[-1]) + muL = float(panel.get("L", pd.Series([2.0])).iloc[-1]) + sigL = max(0.2, 0.5 * muL) + z = cp.Variable() + # Proxy cost: holding ~ z, backlog ~ max(0, Phi^{-1}(0.95)-z) + z_req = norm.ppf(0.95) + backlog_short = cp.pos(z_req - z) + cost = 0.1 * z + 0.5 * backlog_short + # DRO penalty + rho = 0.1 + objective = cp.Minimize(cost + rho * cp.abs(z)) + constraints = [z >= 0.5, z <= 3.0] + prob = cp.Problem(objective, constraints) + prob.solve(solver=cp.ECOS, warm_start=True) + return {"type": "DRO-MPC", "z_alpha": float(z.value), "status": prob.status, "rho": rho} + except Exception as e: + return {"type": "DRO-MPC", "error": str(e)} + + +def constrained_mpc_policy( + last: Dict[str, float], + limits: Dict[str, float], + lane_cap: float, + weights: Dict[str, float], + impact: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + # Optimize z, r, e, ds under caps and rate limits to minimize weighted proxy cost + try: + import cvxpy as cp + z = cp.Variable() + r = cp.Variable() + e = cp.Variable() + ds = cp.Variable() + # Objective: weights over expected deltas if available, else regularization + if impact and "expected" in impact: + exp = impact["expected"] + svc = exp.get("service", 0.0) + cst = exp.get("cost_proxy", 0.0) + bwe = exp.get("bullwhip", 0.0) + obj = weights.get("cost", 1.0) * cst + weights.get("service", 1.0) * (-svc) + weights.get("bullwhip", 1.0) * bwe + else: + obj = 0.1 * (z - last.get("z_alpha_new", 1.28)) ** 2 + 0.05 * (r - last.get("reroute_share", 0.0)) ** 2 + 0.05 * (e - last.get("expedite_share", 0.0)) ** 2 + 0.05 * (ds - last.get("dual_source_share", 0.0)) ** 2 + constraints = [ + z >= 0.5, z <= 3.0, + r >= 0.0, e >= 0.0, ds >= 0.0, + r + e + ds <= lane_cap, + cp.abs(z - last.get("z_alpha_new", 1.28)) <= limits.get("z_alpha_new", 0.3), + cp.abs(r - last.get("reroute_share", 0.0)) <= limits.get("reroute_share", 0.15), + cp.abs(e - last.get("expedite_share", 0.0)) <= limits.get("expedite_share", 0.15), + cp.abs(ds - last.get("dual_source_share", 0.0)) <= limits.get("dual_source_share", 0.2), + ] + prob = cp.Problem(cp.Minimize(obj), constraints) + prob.solve(solver=cp.ECOS_BB, mi_max_iterations=1000) + return {"z_alpha_new": float(z.value), "reroute_share": float(r.value), "expedite_share": float(e.value), "dual_source_share": float(ds.value), "status": prob.status} + except Exception as e: + return {"error": str(e)} + + +def cpsat_policy( + last: Dict[str, float], + limits: Dict[str, float], + lane_cap: float, +) -> Dict[str, Any]: + """Hard-feasibility projection using cvxpy (no OR-Tools). + + Minimizes squared distance to last controls subject to rate and capacity limits. + """ + try: + import cvxpy as cp + z = cp.Variable() + r = cp.Variable() + e = cp.Variable() + ds = cp.Variable() + # Objective: keep close to last (stability) + obj = (z - last.get("z_alpha_new", 1.28)) ** 2 + (r - last.get("reroute_share", 0.0)) ** 2 + (e - last.get("expedite_share", 0.0)) ** 2 + (ds - last.get("dual_source_share", 0.0)) ** 2 + cons = [ + z >= 0.5, z <= 3.0, + r >= 0.0, e >= 0.0, ds >= 0.0, + r + e + ds <= lane_cap, + cp.abs(z - last.get("z_alpha_new", 1.28)) <= limits.get("z_alpha_new", 0.3), + cp.abs(r - last.get("reroute_share", 0.0)) <= limits.get("reroute_share", 0.15), + cp.abs(e - last.get("expedite_share", 0.0)) <= limits.get("expedite_share", 0.15), + cp.abs(ds - last.get("dual_source_share", 0.0)) <= limits.get("dual_source_share", 0.2), + ] + prob = cp.Problem(cp.Minimize(obj), cons) + prob.solve(solver=cp.ECOS) + if z.value is None: + return {"error": "infeasible"} + return { + "z_alpha_new": float(z.value), + "reroute_share": float(r.value), + "expedite_share": float(e.value), + "dual_source_share": float(ds.value), + "status": prob.status, + } + except Exception as e: + return {"error": str(e)} + + +def chance_constrained_mpc(panel: pd.DataFrame) -> Dict[str, Any]: + """Closed-form chance constraint for service level: z >= Phi^{-1}(beta).""" + try: + from scipy.stats import norm + beta = 0.95 + z_req = float(norm.ppf(beta)) + return {"type": "Chance-MPC", "beta": beta, "z_min": z_req, "feasible": True} + except Exception as e: + return {"type": "Chance-MPC", "error": str(e)} + + +def h_infinity_controller(panel: pd.DataFrame) -> Dict[str, Any]: + """Approximate robust controller via discrete LQR (DARE) for inventory linearization.""" + try: + from scipy.linalg import solve_discrete_are + # Simple 1D: x_{t+1} = x_t - u_t + w_t + A = np.array([[1.0]]) + B = np.array([[1.0]]) + Q = np.array([[1.0]]) + R = np.array([[0.5]]) + P = solve_discrete_are(A, B, Q, R) + K = np.linalg.inv(B.T @ P @ B + R) @ (B.T @ P @ A) + gamma = float(np.sqrt(np.max(np.linalg.eigvals(P)))) + return {"type": "LQR", "K": float(K), "gamma_proxy": gamma} + except Exception as e: + return {"type": "LQR", "error": str(e)} + + +def sddp_policy_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"type": "SDDP", "stages": 3, "status": "ok"} + + +# ===== Multi-Echelon Inventory (stubs) ===== +def clark_scarf_baseline(panel: pd.DataFrame) -> Dict[str, Any]: + """Compute a simple echelon base-stock as I + mean(P) across time (proxy).""" + try: + if set(["I"]).issubset(panel.columns): + echelon = float(panel["I"].mean()) + float(panel.get("P", pd.Series([0.0])).mean()) + return {"echelon_base_stock": echelon} + return {"echelon_base_stock": 0.0} + except Exception as e: + return {"error": str(e)} + + +def risk_averse_newsvendor_stub(panel: pd.DataFrame) -> Dict[str, Any]: + """Entropic-risk newsvendor: grid search Q to minimize 1/theta log E[exp(theta*cost)].""" + try: + D = panel.get("D", pd.Series([100.0])).astype(float).values + if len(D) < 10: + return {"Q_star": float(np.percentile(D, 95)), "risk_measure": "entropic", "theta": 0.5} + theta = 0.5 + c_h, c_b = 0.1, 0.5 + Q_grid = np.linspace(np.percentile(D, 10), np.percentile(D, 99), 30) + best, bestQ = 1e9, Q_grid[0] + for Q in Q_grid: + cost = c_h * np.maximum(Q - D, 0.0) + c_b * np.maximum(D - Q, 0.0) + risk = (1.0 / theta) * np.log(np.mean(np.exp(theta * cost))) + if risk < best: + best, bestQ = risk, Q + return {"Q_star": float(bestQ), "risk_measure": "entropic", "theta": theta, "objective": float(best)} + except Exception as e: + return {"error": str(e)} + + +# ===== Digital twin gating scenarios ===== +def run_gating_scenarios(agent: "SupplyShockCRCAgent") -> Dict[str, Any]: + grid = [ + {"name": "baseline", "z": agent.policy.z_alpha, "r": 0.0, "e": 0.0}, + {"name": "z1.6_r0.20_e0.10", "z": 1.6, "r": 0.20, "e": 0.10}, + {"name": "z1.8_r0.15_e0.15", "z": 1.8, "r": 0.15, "e": 0.15}, + {"name": "z2.0_e0.20", "z": 2.0, "r": 0.00, "e": 0.20}, + ] + results: Dict[str, Any] = {} + base = agent.simulate() + base_kpi = agent.summarize(base) + for g in grid: + impact = agent._quantified_impact(g["z"], g["r"], g["e"], trials=50) + exp = impact.get("expected", {}) + svc_delta = float(exp.get("service", 0.0)) + cost_delta = float(exp.get("cost_proxy", 0.0)) + marginal_cost_per_pp = float(cost_delta / max(1e-6, svc_delta * 100.0)) if svc_delta > 0 else float("inf") + results[g["name"]] = { + "expected": exp, + "ci80": impact.get("ci80", {}), + "marginal_cost_per_1pp_service": marginal_cost_per_pp, + } + return {"baseline": base_kpi, "scenarios": results} + + +# ===== Network & Systemic Risk (stubs) ===== +def percolation_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"gc_threshold": 0.3, "expected_shortfall": 0.12} + + +def eisenberg_noe_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"clearing_vector_norm": 0.97} + + +def k_cut_sets_stub() -> Dict[str, Any]: + return {"min_k_cut": 2, "critical_arcs": ["lane_CN-EU", "lane_US-EU"]} + + +# ===== Pricing & Demand (stubs) ===== +def logit_share_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"alpha_price": 0.8, "elasticity": -1.2} + + +def dynamic_pricing_dp_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"policy": "approxDP", "discount": 0.98} + + +def ramsey_pricing_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"lambda": 0.2, "implied_markups": {"A": 0.1, "B": 0.05}} + + +# ===== Policy Evaluation (stubs) ===== +def synthetic_control_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"effect": -0.03, "weight_entropy": 0.9} + + +def did_iv_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"beta": -0.08, "se_robust": 0.04} + + +# ===== Security & Integrity (stubs) ===== +def secure_state_l1_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"attacks_detected": 0} + + +def spectral_shift_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"eig_drift": 0.05} + + +# ===== Investment under Uncertainty (stubs) ===== +def real_options_stub() -> Dict[str, Any]: + return {"LSM_value": 1.2} + + +def supplier_portfolio_stub() -> Dict[str, Any]: + return {"selected_suppliers": ["S1", "S3"], "service_prob": 0.97} + + +# ===== Further Advanced (realistic approximations or stubs) ===== +def transport_modes_sim_stub() -> Dict[str, Any]: + return {"modes": ["SEA", "AIR", "RAIL"], "schedules": {"SEA": 7, "AIR": 2, "RAIL": 5}} + + +def online_em_ms(panel: pd.DataFrame) -> Dict[str, Any]: + # Lightweight online update: adapt mean/var of L based on EWMA + if "L" not in panel.columns: + return {"mu": 2.0, "sigma": 1.0} + L = panel["L"].astype(float).values + lam = 0.2 + mu = L[0] + var = 1.0 + for x in L[1:]: + mu = lam * x + (1 - lam) * mu + var = lam * (x - mu) ** 2 + (1 - lam) * var + return {"mu": float(mu), "sigma": float(max(0.2, var ** 0.5))} + + +def hierarchical_bsts_stub(panel: pd.DataFrame) -> Dict[str, Any]: + # Placeholder summary for hierarchical pooling + groups = {"facilities": int(panel.shape[1] > 0)} + return {"pooled": True, "groups": groups} + + +def multi_stage_mpc_stub(panel: pd.DataFrame) -> Dict[str, Any]: + return {"horizon": 3, "reroute_share": 0.1, "expedite_share": 0.05} + + +def iot_fusion_stub() -> Dict[str, Any]: + return {"adjust_phi": -0.05, "adjust_L": +0.3} + + +def nested_logit_stub() -> Dict[str, Any]: + try: + from statsmodels.discrete.discrete_model import MNLogit # noqa: F401 + return {"available": True, "note": "Use MNLogit with inventory and price features"} + except Exception: + return {"available": False} + + +def kcut_hardening_stub() -> Dict[str, Any]: + return {"harden": ["lane_CN-EU"], "budget": 1} + + +def linucb_policy_stub(panel: pd.DataFrame) -> Dict[str, Any]: + # Simple LinUCB placeholders for reroute/expedite arms + alpha = 0.5 + arms = ["keep", "reroute", "expedite"] + scores = {a: 0.5 + alpha * 0.1 for a in arms} + choice = max(scores, key=scores.get) + return {"choice": choice, "scores": scores} + + +def l1_observer_stub(panel: pd.DataFrame) -> Dict[str, Any]: + try: + import cvxpy as cp # noqa: F401 + return {"feasible": True} + except Exception: + return {"feasible": False} + + +def fan_charts_stub(panel: pd.DataFrame) -> Dict[str, Any]: + if "D" not in panel.columns: + return {"quantiles": {}} + y = panel["D"].astype(float).values + q = { + "p10": float(np.percentile(y, 10)), + "p50": float(np.percentile(y, 50)), + "p90": float(np.percentile(y, 90)), + } + return {"quantiles": q} + + +async def main() -> None: + skus = [SKUKey("SKU1", "F1"), SKUKey("SKU2", "F1"), SKUKey("SKU3", "F2")] + agent = SupplyShockCRCAgent(skus=skus, T=40) + + # Baseline simulate + df = agent.simulate() + kpis = agent.summarize(df) + strengths = agent.causal_edges(df) + print("Baseline KPIs:", kpis) + print("Causal strengths:", strengths) + + # Example interventions + scenario_port = {"disruption_route": {"F1": 1}} + scenario_outage = {"outage_facility": {"F2": 1}} + cmp_port = agent.intervene_and_compare(scenario_port) + cmp_outage = agent.intervene_and_compare(scenario_outage) + print("Port disruption delta:", cmp_port["delta"]) + print("Facility outage delta:", cmp_outage["delta"]) + + # Full control cycle with AI narrative and upgraded CR-CA analysis + result = agent.control_cycle() + print("-" * 80) + print("AI Narrative Analysis:\n", result.get("ai_analysis")) + print("-" * 80) + print("Causal Summary:\n", result.get("causal_summary")) + + # Display upgraded analysis results + advanced = result.get("advanced_causal", {}) + if advanced and not advanced.get("error"): + print("\n" + "=" * 80) + print("UPGRADED CR-CA ANALYSIS RESULTS") + print("=" * 80) + + if advanced.get("sensitivity_analysis"): + sens = advanced["sensitivity_analysis"] + print("\n--- Sensitivity Analysis ---") + if sens.get("service_drivers"): + svc = sens["service_drivers"] + print(f"Service (backlog) drivers:") + print(f" Most influential: {svc.get('most_influential_variable', 'N/A')} (sensitivity: {svc.get('most_influential_sensitivity', 0.0):.4f})") + if sens.get("cost_drivers"): + cost = sens["cost_drivers"] + print(f"Cost (price) drivers:") + print(f" Most influential: {cost.get('most_influential_variable', 'N/A')} (sensitivity: {cost.get('most_influential_sensitivity', 0.0):.4f})") + + if advanced.get("granger_causality"): + gc = advanced["granger_causality"] + print("\n--- Granger Causality Tests ---") + print(f"D → O: {gc.get('demand_granger_causes_orders', False)} (F={gc.get('d_o_f_stat', 0.0):.2f})") + print(f"B → p: {gc.get('backlog_granger_causes_price', False)} (F={gc.get('b_p_f_stat', 0.0):.2f})") + + if advanced.get("information_theory") and advanced["information_theory"].get("entropies"): + it = advanced["information_theory"] + print("\n--- Information Theory ---") + print("Variable entropies:") + for var, entropy in list(it["entropies"].items())[:5]: + print(f" H({var}) = {entropy:.3f} bits") + if it.get("mutual_information"): + print("Top mutual information:") + mi_items = sorted(it["mutual_information"].items(), key=lambda x: x[1], reverse=True)[:3] + for pair, mi_val in mi_items: + print(f" I({pair}) = {mi_val:.3f} bits") + + if advanced.get("shapley_attribution"): + shap = advanced["shapley_attribution"] + if shap.get("backlog_drivers") and shap["backlog_drivers"].get("shapley_values"): + print("\n--- Shapley Value Attribution (Backlog Drivers) ---") + shap_b = shap["backlog_drivers"]["shapley_values"] + for var, value in sorted(shap_b.items(), key=lambda x: abs(x[1]), reverse=True)[:5]: + print(f" {var}: {value:+.4f}") + + if advanced.get("root_causes") and advanced["root_causes"].get("service_issues"): + rc = advanced["root_causes"]["service_issues"] + if rc.get("ultimate_root_causes"): + print("\n--- Deep Root Cause Analysis (Service Issues) ---") + print("Ultimate root causes of backlog:") + for root in rc["ultimate_root_causes"][:5]: + print(f" - {root.get('root_cause', 'N/A')} " + f"(path strength: {root.get('path_strength', 0.0):.3f}, depth: {root.get('depth', 0)})") + + if advanced.get("optimal_intervention") and advanced["optimal_intervention"].get("optimal_sequence"): + opt = advanced["optimal_intervention"] + print("\n--- Optimal Intervention Sequence (Bellman) ---") + print(f"Optimal {len(opt['optimal_sequence'])}-step sequence to minimize backlog:") + for i, step in enumerate(opt["optimal_sequence"][:3], 1): + print(f" Step {i}: {step}") + print(f"Expected total value: {opt.get('total_value', 0.0):.2f}") + + if advanced.get("alternate_realities") and advanced["alternate_realities"].get("best_reality"): + ar = advanced["alternate_realities"] + best = ar["best_reality"] + print("\n--- Alternate Realities Exploration ---") + print(f"Best alternate reality (minimize backlog):") + print(f" Interventions: {best.get('interventions', {})}") + print(f" Expected backlog: {best.get('target_value', 0.0):.2f}") + print(f" Improvement: {ar.get('improvement_potential', 0.0):+.2f}") + + if advanced.get("chain_reaction"): + cr = advanced["chain_reaction"] + print("\n--- Cascading Chain Reaction Analysis ---") + summary = cr.get("summary", {}) + print(f"Total paths found: {summary.get('total_paths_found', 0)}") + print(f"Feedback loops: {summary.get('feedback_loops_detected', 0)}") + if cr.get("final_predictions"): + print(f"Final predictions: {dict(list(cr['final_predictions'].items())[:5])}") + + if advanced.get("cross_validation"): + cv = advanced["cross_validation"] + print("\n--- Cross-Validation for Edge Reliability ---") + for edge, cv_data in list(cv.items())[:5]: + print(f" {edge}: CV error={cv_data.get('mean_cv_error', 0.0):.4f} ± {cv_data.get('standard_error', 0.0):.4f}") + + print("\n" + "=" * 80) + + +if __name__ == "__main__": + import asyncio + + logger.remove() + logger.add("crca_supply_shock.log", rotation="50 MB", retention="7 days", level="INFO") + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Stopped.") + +