зеркало из
https://github.com/docxology/cognitive.git
synced 2025-10-30 20:56:04 +02:00
707 строки
29 KiB
Python
707 строки
29 KiB
Python
"""
|
|
Generic POMDP Implementation.
|
|
|
|
This module implements a generic Partially Observable Markov Decision Process (POMDP)
|
|
using Active Inference principles. The implementation is designed to be flexible and
|
|
handle arbitrary discrete observation and state spaces.
|
|
|
|
Key Components:
|
|
- A matrix: Observation model P(o|s) mapping states to observations
|
|
- B matrix: Transition model P(s'|s,a) defining state dynamics under actions
|
|
- C matrix: Preference matrix over observations across time
|
|
- D matrix: Prior beliefs over initial states
|
|
- E matrix: Prior preferences over policies
|
|
"""
|
|
|
|
import numpy as np
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Union
|
|
import logging
|
|
import itertools
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class NumpyEncoder(json.JSONEncoder):
|
|
"""JSON encoder that handles numpy types."""
|
|
def default(self, obj):
|
|
if isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
if isinstance(obj, np.integer):
|
|
return int(obj)
|
|
if isinstance(obj, np.floating):
|
|
return float(obj)
|
|
if isinstance(obj, np.bool_):
|
|
return bool(obj)
|
|
return super().default(obj)
|
|
|
|
@dataclass
|
|
class ModelState:
|
|
"""Class to hold the current state of the POMDP."""
|
|
beliefs: np.ndarray
|
|
time_step: int = 0
|
|
current_state: Optional[int] = None
|
|
history: Dict[str, List] = field(default_factory=lambda: {
|
|
'observations': [],
|
|
'actions': [],
|
|
'beliefs': [],
|
|
'free_energy': [],
|
|
'efe_components': [] # Store EFE components for visualization
|
|
})
|
|
|
|
def __post_init__(self):
|
|
"""Validate state after initialization."""
|
|
if len(self.beliefs.shape) != 1:
|
|
raise ValueError("Beliefs must be a 1D array")
|
|
if not np.allclose(self.beliefs.sum(), 1.0):
|
|
raise ValueError("Beliefs must sum to 1")
|
|
if not np.all(self.beliefs >= 0):
|
|
raise ValueError("Beliefs must be non-negative")
|
|
|
|
def update(self, observation: int, action: Optional[int], free_energy: float,
|
|
efe_components: Optional[Dict] = None) -> None:
|
|
"""Update state and history with new information."""
|
|
if action is not None:
|
|
self.history['actions'].append(int(action))
|
|
self.history['observations'].append(int(observation))
|
|
self.history['beliefs'].append(self.beliefs.copy())
|
|
self.history['free_energy'].append(float(free_energy))
|
|
if efe_components is not None:
|
|
self.history['efe_components'].append(efe_components)
|
|
self.time_step += 1
|
|
|
|
class GenericPOMDP:
|
|
"""Generic POMDP implementation using Active Inference.
|
|
|
|
This class implements a Partially Observable Markov Decision Process using
|
|
Active Inference principles. It maintains beliefs over hidden states and
|
|
updates them based on observations and actions.
|
|
|
|
Key Features:
|
|
- Belief updating using variational inference
|
|
- Action selection using Expected Free Energy minimization
|
|
- Temporal preference learning
|
|
- Numerical stability handling
|
|
- State/history saving and loading
|
|
|
|
Matrix Dimensions:
|
|
- A: (num_observations, num_states) - Column stochastic
|
|
- B: (num_states, num_states, num_actions) - Column stochastic per action
|
|
- C: (num_observations, planning_horizon) - Preference values
|
|
- D: (num_states,) - Initial belief distribution
|
|
- E: (num_actions,) - Policy prior distribution
|
|
|
|
Attributes:
|
|
num_observations (int): Number of possible observations
|
|
num_states (int): Number of hidden states
|
|
num_actions (int): Number of possible actions
|
|
planning_horizon (int): Number of timesteps to plan ahead
|
|
max_policies (int): Maximum number of policies to evaluate
|
|
temperature (float): Temperature parameter for action selection
|
|
learning_rate (float): Learning rate for belief updates
|
|
stability_threshold (float): Small constant for numerical stability
|
|
state (ModelState): Current state of the POMDP
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
num_observations: int,
|
|
num_states: int,
|
|
num_actions: int,
|
|
planning_horizon: int = 4,
|
|
max_policies: int = 1000,
|
|
temperature: float = 1.0,
|
|
learning_rate: float = 0.1,
|
|
stability_threshold: float = 1e-16
|
|
):
|
|
"""Initialize the POMDP.
|
|
|
|
Args:
|
|
num_observations: Number of possible observations
|
|
num_states: Number of hidden states
|
|
num_actions: Number of possible actions
|
|
planning_horizon: Number of timesteps to plan ahead (default: 4)
|
|
max_policies: Maximum number of policies to evaluate
|
|
temperature: Temperature parameter for action selection
|
|
Higher values lead to more exploration
|
|
learning_rate: Learning rate for belief updates
|
|
Higher values lead to faster but potentially less stable updates
|
|
stability_threshold: Small constant for numerical stability
|
|
Used to prevent division by zero and log(0)
|
|
|
|
Raises:
|
|
ValueError: If any dimension is not positive
|
|
"""
|
|
# Validate inputs
|
|
if any(x <= 0 for x in [num_observations, num_states, num_actions]):
|
|
raise ValueError("Dimensions must be positive integers")
|
|
|
|
self.num_observations = num_observations
|
|
self.num_states = num_states
|
|
self.num_actions = num_actions
|
|
self.planning_horizon = planning_horizon
|
|
self.max_policies = max_policies
|
|
self.temperature = temperature
|
|
self.learning_rate = learning_rate
|
|
self.stability_threshold = stability_threshold
|
|
|
|
# Initialize matrices
|
|
self._initialize_matrices()
|
|
|
|
# Initialize state
|
|
self.state = ModelState(
|
|
current_state=0,
|
|
beliefs=self.D.copy(),
|
|
time_step=0
|
|
)
|
|
self.state.history['beliefs'].append(self.state.beliefs.copy())
|
|
|
|
def _initialize_matrices(self):
|
|
"""Initialize the model matrices with reasonable defaults."""
|
|
# A matrix: Observation model P(o|s)
|
|
self.A = np.random.rand(self.num_observations, self.num_states)
|
|
self.A = self._normalize(self.A) # Column stochastic
|
|
|
|
# B matrix: Transition model P(s'|s,a)
|
|
self.B = np.zeros((self.num_states, self.num_states, self.num_actions))
|
|
for a in range(self.num_actions):
|
|
B_a = np.eye(self.num_states) * 0.8 # Identity-based with high self-transition
|
|
B_a += np.random.rand(self.num_states, self.num_states) * 0.2 # Add random transitions
|
|
self.B[:,:,a] = self._normalize(B_a) # Column stochastic
|
|
|
|
# C matrix: Preferences over observations (fixed through time)
|
|
self.C = np.zeros((self.num_observations, self.planning_horizon)) # Initialize with zeros
|
|
# Set strong preference for observation 0
|
|
base_preferences = np.zeros(self.num_observations)
|
|
base_preferences[0] = 2.0 # Strong positive preference for observation 0
|
|
base_preferences[1:] = -0.5 # Slight negative preference for other observations
|
|
for t in range(self.planning_horizon):
|
|
self.C[:, t] = base_preferences
|
|
|
|
# D matrix: Prior beliefs over states
|
|
self.D = np.ones(self.num_states) / self.num_states # Uniform prior
|
|
|
|
# E matrix: Prior over policies
|
|
self.E = np.ones(self.num_actions) / self.num_actions # Uniform prior
|
|
|
|
def _normalize(self, x: np.ndarray) -> np.ndarray:
|
|
"""Normalize array to sum to 1 along first axis with numerical stability.
|
|
|
|
Args:
|
|
x: Array to normalize
|
|
|
|
Returns:
|
|
Normalized array
|
|
"""
|
|
# Handle negative values
|
|
x = np.maximum(x, self.stability_threshold)
|
|
|
|
if x.ndim == 1:
|
|
# For 1D arrays (e.g. beliefs)
|
|
normalizer = x.sum() + self.stability_threshold
|
|
if normalizer > self.stability_threshold:
|
|
return x / normalizer
|
|
return np.ones_like(x) / len(x)
|
|
else:
|
|
# For 2D arrays (e.g. matrices)
|
|
normalizer = x.sum(axis=0, keepdims=True) + self.stability_threshold
|
|
zero_cols = (normalizer < self.stability_threshold).flatten()
|
|
if np.any(zero_cols):
|
|
x[:, zero_cols] = 1.0 / x.shape[0]
|
|
return x / normalizer
|
|
|
|
def _compute_free_energy(self, beliefs: np.ndarray) -> float:
|
|
"""Compute variational free energy with numerical stability."""
|
|
# Expected log-likelihood
|
|
pred_obs = self.A @ beliefs # Shape: (num_observations,)
|
|
pred_obs = np.maximum(pred_obs, self.stability_threshold)
|
|
|
|
# Compute log likelihood using the predicted observations
|
|
log_likelihood = np.log(pred_obs + self.stability_threshold) # Shape: (num_observations,)
|
|
expected_ll = np.sum(pred_obs * log_likelihood) # Scalar
|
|
|
|
# KL divergence from prior
|
|
beliefs = np.maximum(beliefs, self.stability_threshold)
|
|
prior = np.maximum(self.D, self.stability_threshold)
|
|
kl_div = np.sum(beliefs * (np.log(beliefs + self.stability_threshold) - np.log(prior + self.stability_threshold)))
|
|
|
|
return -expected_ll + kl_div
|
|
|
|
def _update_beliefs(self, observation: int, action: int) -> np.ndarray:
|
|
"""Update beliefs using variational inference with momentum and adaptive learning rate.
|
|
|
|
This method implements belief updating using a gradient-based approach with
|
|
several numerical stability improvements:
|
|
1. Momentum to help escape local minima
|
|
2. Adaptive learning rate based on belief certainty
|
|
3. Numerical stability thresholds
|
|
4. Convergence checking
|
|
|
|
Args:
|
|
observation: The observed state
|
|
action: The action taken
|
|
|
|
Returns:
|
|
Updated beliefs over states
|
|
|
|
Implementation Details:
|
|
- Uses Bayes rule with iterative optimization
|
|
- Includes momentum term for better convergence
|
|
- Adapts learning rate based on belief certainty
|
|
- Ensures numerical stability throughout
|
|
- Monitors convergence with early stopping
|
|
"""
|
|
# Get likelihood and transition matrices
|
|
likelihood = self.A[observation, :] # Observation likelihood
|
|
transition = self.B[:, :, action] # State transition matrix
|
|
|
|
# Compute predicted beliefs (prior)
|
|
predicted_beliefs = transition @ self.state.beliefs
|
|
predicted_beliefs = self._normalize(predicted_beliefs)
|
|
|
|
# Initialize variables for iterative update
|
|
current_beliefs = predicted_beliefs.copy()
|
|
prev_delta = np.zeros_like(current_beliefs)
|
|
momentum = 0.9
|
|
max_iters = 50
|
|
min_delta = 1e-6
|
|
base_lr = 0.1
|
|
|
|
for _ in range(max_iters):
|
|
# Store old beliefs for convergence check
|
|
old_beliefs = current_beliefs.copy()
|
|
|
|
# Compute posterior using Bayes rule
|
|
posterior = likelihood * current_beliefs
|
|
posterior = self._normalize(posterior)
|
|
|
|
# Compute gradient
|
|
gradient = posterior - current_beliefs
|
|
|
|
# Adaptive learning rate based on belief certainty
|
|
certainty = 1.0 - self._compute_entropy(current_beliefs) / np.log(len(current_beliefs))
|
|
lr = base_lr * (1.0 - 0.9 * certainty)
|
|
|
|
# Update with momentum
|
|
delta = lr * gradient + momentum * prev_delta
|
|
current_beliefs += delta
|
|
prev_delta = delta
|
|
|
|
# Normalize
|
|
current_beliefs = self._normalize(current_beliefs)
|
|
|
|
# Check convergence
|
|
if np.max(np.abs(current_beliefs - old_beliefs)) < min_delta:
|
|
break
|
|
|
|
return current_beliefs
|
|
|
|
def _compute_expected_free_energy(
|
|
self,
|
|
beliefs: np.ndarray,
|
|
action: Optional[int] = None,
|
|
timestep: Optional[int] = None
|
|
) -> Union[float, Dict[str, np.ndarray]]:
|
|
"""Compute expected free energy components.
|
|
|
|
Args:
|
|
beliefs: Current belief state.
|
|
action: Optional specific action to compute EFE for.
|
|
timestep: Optional timestep for temporal discounting.
|
|
|
|
Returns:
|
|
If action is provided: float representing total EFE for that action
|
|
If action is None: Dictionary containing EFE components for all actions
|
|
"""
|
|
# If computing for a single action, return scalar total EFE
|
|
if action is not None:
|
|
# Get next state distribution
|
|
next_beliefs = self.B[:, :, action] @ beliefs
|
|
|
|
# Get predicted observations
|
|
predicted_obs = self.A @ next_beliefs
|
|
|
|
# Calculate ambiguity (negative entropy of beliefs)
|
|
# Use stability threshold for consistency
|
|
ambiguity = -np.sum(next_beliefs * np.log(next_beliefs + self.stability_threshold))
|
|
|
|
# Calculate risk (KL divergence from predicted to preferred outcomes)
|
|
risk = 0.00001 * np.sum(predicted_obs * np.log((predicted_obs + self.stability_threshold) / (predicted_obs + self.stability_threshold)))
|
|
|
|
# Calculate expected preferences with proper timestep wrapping
|
|
t = 0 if timestep is None else min(timestep, self.C.shape[1] - 1)
|
|
expected_preferences = 2.0 * np.sum(predicted_obs * self.C[:, t]) # Increased preference weight
|
|
|
|
# Return total EFE as scalar
|
|
return ambiguity + risk - expected_preferences
|
|
|
|
# Otherwise compute components for all actions
|
|
num_actions = self.num_actions
|
|
ambiguity = np.zeros(num_actions)
|
|
risk = np.zeros(num_actions)
|
|
expected_preferences = np.zeros(num_actions)
|
|
total_efe = np.zeros(num_actions)
|
|
|
|
for a in range(num_actions):
|
|
# Get next state distribution
|
|
next_beliefs = self.B[:, :, a] @ beliefs
|
|
|
|
# Get predicted observations
|
|
predicted_obs = self.A @ next_beliefs
|
|
|
|
# Calculate ambiguity (negative entropy of beliefs)
|
|
ambiguity[a] = -np.sum(next_beliefs * np.log(next_beliefs + self.stability_threshold))
|
|
|
|
# Calculate risk (KL divergence from predicted to preferred outcomes)
|
|
risk[a] = 0.00001 * np.sum(predicted_obs * np.log((predicted_obs + self.stability_threshold) / (predicted_obs + self.stability_threshold)))
|
|
|
|
# Calculate expected preferences with proper timestep wrapping
|
|
t = 0 if timestep is None else min(timestep, self.C.shape[1] - 1)
|
|
expected_preferences[a] = 2.0 * np.sum(predicted_obs * self.C[:, t]) # Increased preference weight
|
|
|
|
# Calculate total EFE
|
|
total_efe[a] = ambiguity[a] + risk[a] - expected_preferences[a]
|
|
|
|
return {
|
|
'ambiguity': ambiguity,
|
|
'risk': risk,
|
|
'expected_preferences': expected_preferences,
|
|
'total_efe': total_efe
|
|
}
|
|
|
|
def _select_action(self, action_values):
|
|
"""Select action using softmax with very low temperature."""
|
|
# Use extremely low temperature to make selection more deterministic
|
|
action_probs = self._softmax(-action_values / (0.000001 + self.stability_threshold))
|
|
return np.random.choice(len(action_probs), p=action_probs)
|
|
|
|
def _softmax(self, x: np.ndarray) -> np.ndarray:
|
|
"""Compute softmax probabilities with numerical stability.
|
|
|
|
Args:
|
|
x: Input array
|
|
|
|
Returns:
|
|
Softmax probabilities
|
|
"""
|
|
# Clip values to prevent overflow/underflow
|
|
x = np.clip(x, -500, 500)
|
|
|
|
# Shift for numerical stability
|
|
x_shifted = x - np.max(x)
|
|
|
|
# Compute exponentials
|
|
exp_x = np.exp(x_shifted)
|
|
|
|
# Normalize with stability threshold
|
|
return exp_x / (exp_x.sum() + self.stability_threshold)
|
|
|
|
def _generate_observation(self) -> int:
|
|
"""Generate observation based on current beliefs."""
|
|
# Expected observation distribution
|
|
obs_probs = self.A @ self.state.beliefs
|
|
obs_probs = self._normalize(obs_probs) # Ensure valid distribution
|
|
|
|
# Sample observation
|
|
return np.random.choice(self.num_observations, p=obs_probs)
|
|
|
|
def step(self, action: Optional[int] = None) -> Tuple[int, float]:
|
|
"""Take a step in the environment.
|
|
|
|
Args:
|
|
action: Optional action to take. If None, select action based on policy.
|
|
|
|
Returns:
|
|
Tuple of (observation, free energy).
|
|
"""
|
|
# Select action if not provided
|
|
if action is None:
|
|
# Get EFE components and policy probabilities
|
|
components = self.get_efe_components(self.state.beliefs, self.state.time_step)
|
|
action_posterior = components['action_posterior']
|
|
|
|
# Sample action from posterior
|
|
action = np.random.choice(self.num_actions, p=action_posterior)
|
|
|
|
# Get next state distribution
|
|
next_beliefs = self.B[:, :, action] @ self.state.beliefs
|
|
|
|
# Get observation probabilities
|
|
obs_probs = self.A @ next_beliefs
|
|
|
|
# Sample observation
|
|
observation = np.random.choice(self.num_observations, p=obs_probs)
|
|
|
|
# Update beliefs
|
|
updated_beliefs = self._update_beliefs(observation, action)
|
|
|
|
# Update state
|
|
self.state.beliefs = updated_beliefs
|
|
self.state.time_step += 1
|
|
|
|
# Calculate free energy for monitoring
|
|
efe_components = self._compute_expected_free_energy(updated_beliefs, action)
|
|
|
|
# Handle both scalar and dictionary return types
|
|
if isinstance(efe_components, dict):
|
|
free_energy = np.min(efe_components['total_efe'])
|
|
else:
|
|
free_energy = efe_components
|
|
|
|
# Update history
|
|
if 'observations' not in self.state.history:
|
|
self.state.history['observations'] = []
|
|
if 'actions' not in self.state.history:
|
|
self.state.history['actions'] = []
|
|
if 'free_energy' not in self.state.history:
|
|
self.state.history['free_energy'] = []
|
|
if 'beliefs' not in self.state.history:
|
|
self.state.history['beliefs'] = []
|
|
|
|
self.state.history['observations'].append(observation)
|
|
self.state.history['actions'].append(action)
|
|
self.state.history['free_energy'].append(free_energy)
|
|
self.state.history['beliefs'].append(updated_beliefs.copy())
|
|
|
|
return observation, free_energy
|
|
|
|
def save_state(self, path: Union[str, Path]) -> None:
|
|
"""Save current state to file.
|
|
|
|
Args:
|
|
path: Path to save state to
|
|
"""
|
|
path = Path(path)
|
|
state_dict = {
|
|
'beliefs': self.state.beliefs.tolist(),
|
|
'time_step': int(self.state.time_step),
|
|
'current_state': None if self.state.current_state is None else int(self.state.current_state),
|
|
'history': {
|
|
'observations': [int(x) for x in self.state.history['observations']],
|
|
'actions': [None if x is None else int(x) for x in self.state.history['actions']],
|
|
'beliefs': [b.tolist() for b in self.state.history['beliefs']],
|
|
'free_energy': [float(x) for x in self.state.history['free_energy']],
|
|
'efe_components': [
|
|
{k: v.tolist() if isinstance(v, np.ndarray) else
|
|
([x.tolist() if isinstance(x, np.ndarray) else x for x in v] if isinstance(v, list) else v)
|
|
for k, v in comp.items()}
|
|
for comp in self.state.history['efe_components']
|
|
]
|
|
}
|
|
}
|
|
|
|
with open(path, 'w') as f:
|
|
json.dump(state_dict, f, indent=2, cls=NumpyEncoder)
|
|
|
|
def load_state(self, path: Union[str, Path]) -> None:
|
|
"""Load state from file.
|
|
|
|
Args:
|
|
path: Path to load state from
|
|
"""
|
|
path = Path(path)
|
|
with open(path, 'r') as f:
|
|
state_dict = json.load(f)
|
|
|
|
# Create new state with loaded beliefs
|
|
self.state = ModelState(
|
|
beliefs=np.array(state_dict['beliefs']),
|
|
time_step=int(state_dict['time_step']),
|
|
current_state=None if state_dict['current_state'] is None else int(state_dict['current_state'])
|
|
)
|
|
|
|
# Load history with proper type conversion
|
|
self.state.history = {
|
|
'observations': [int(x) for x in state_dict['history']['observations']],
|
|
'actions': [None if x is None else int(x) for x in state_dict['history']['actions']],
|
|
'beliefs': [np.array(b) for b in state_dict['history']['beliefs']],
|
|
'free_energy': [float(x) for x in state_dict['history']['free_energy']],
|
|
'efe_components': [
|
|
{k: (np.array(v) if isinstance(v, list) and not isinstance(v[0], list)
|
|
else [np.array(x) if isinstance(x, list) else x for x in v] if isinstance(v, list)
|
|
else v)
|
|
for k, v in comp.items()}
|
|
for comp in state_dict['history']['efe_components']
|
|
]
|
|
}
|
|
|
|
def reset(self) -> None:
|
|
"""Reset the model to initial state."""
|
|
self.state = ModelState(
|
|
current_state=0,
|
|
beliefs=self.D.copy(),
|
|
time_step=0
|
|
)
|
|
self.state.history['beliefs'].append(self.state.beliefs.copy())
|
|
|
|
def _compute_kl_divergence(self, p: np.ndarray, q: np.ndarray) -> float:
|
|
"""Compute KL divergence between two distributions.
|
|
|
|
Args:
|
|
p: First distribution
|
|
q: Second distribution
|
|
|
|
Returns:
|
|
KL divergence value
|
|
"""
|
|
# Add stability threshold to avoid log(0)
|
|
p = p + self.stability_threshold
|
|
q = q + self.stability_threshold
|
|
|
|
# Normalize
|
|
p = p / p.sum()
|
|
q = q / q.sum()
|
|
|
|
# Compute KL divergence
|
|
return np.sum(p * (np.log(p) - np.log(q)))
|
|
|
|
def get_efe_components(
|
|
self,
|
|
beliefs: Optional[np.ndarray] = None,
|
|
timestep: Optional[int] = None
|
|
) -> Dict[str, Union[np.ndarray, List[List[int]]]]:
|
|
"""Get expected free energy components for all policies.
|
|
|
|
This method computes the Expected Free Energy (EFE) components for all possible
|
|
policies up to the planning horizon. The EFE is composed of:
|
|
1. Ambiguity: Entropy of beliefs (epistemic value)
|
|
2. Risk: KL divergence between predicted and preferred observations
|
|
3. Expected Preferences: Expected value of future observations
|
|
|
|
The total EFE is used to compute action probabilities through softmax.
|
|
|
|
Args:
|
|
beliefs: Optional beliefs to use instead of current state
|
|
timestep: Optional timestep for temporal discounting
|
|
|
|
Returns:
|
|
Dictionary containing:
|
|
- policies: List of action sequences
|
|
- ambiguity: Epistemic value term
|
|
- risk: Risk term from KL divergence
|
|
- expected_preferences: Expected preference satisfaction
|
|
- total_efe: Sum of components
|
|
- action_posterior: Action selection probabilities
|
|
|
|
Implementation Details:
|
|
- Generates all possible policies up to planning horizon
|
|
- Computes EFE components for each policy
|
|
- Uses temporal discounting for future preferences
|
|
- Ensures numerical stability in all computations
|
|
- Normalizes distributions appropriately
|
|
"""
|
|
if beliefs is None:
|
|
beliefs = self.state.beliefs
|
|
if timestep is None:
|
|
timestep = self.state.time_step
|
|
|
|
# Generate policies as lists for proper temporal discounting
|
|
policies = [[a] for a in range(self.num_actions)]
|
|
for _ in range(1, self.planning_horizon):
|
|
new_policies = []
|
|
for policy in policies:
|
|
for a in range(self.num_actions):
|
|
new_policies.append(policy + [a])
|
|
policies = new_policies
|
|
|
|
num_policies = len(policies)
|
|
ambiguity = np.zeros((num_policies, self.planning_horizon))
|
|
risk = np.zeros((num_policies, self.planning_horizon))
|
|
expected_preferences = np.zeros((num_policies, self.planning_horizon))
|
|
total_efe = np.zeros(num_policies)
|
|
|
|
# Compute components for each policy
|
|
for p, policy in enumerate(policies):
|
|
curr_beliefs = beliefs.copy()
|
|
|
|
for t, action in enumerate(policy):
|
|
# Get next state distribution
|
|
next_beliefs = self.B[:, :, action] @ curr_beliefs
|
|
|
|
# Get predicted observations
|
|
pred_obs = self.A @ next_beliefs
|
|
|
|
# Compute ambiguity (entropy of beliefs)
|
|
ambiguity[p, t] = -np.sum(next_beliefs * np.log(next_beliefs + self.stability_threshold))
|
|
|
|
# Compute risk (KL between predicted and preferred outcomes)
|
|
preferred_obs = self._softmax(self.C[:, t] if t < self.C.shape[1] else self.C[:, -1])
|
|
risk[p, t] = self._compute_kl_divergence(pred_obs, preferred_obs)
|
|
|
|
# Store expected preferences
|
|
expected_preferences[p, t] = np.sum(pred_obs * self.C[:, t if t < self.C.shape[1] else -1])
|
|
|
|
# Update beliefs for next timestep
|
|
curr_beliefs = next_beliefs
|
|
|
|
# Compute total EFE for policy with temporal discounting
|
|
total_efe[p] = np.sum(ambiguity[p, :] + risk[p, :] - expected_preferences[p, :])
|
|
|
|
# Compute action posterior using softmax
|
|
action_posterior = np.zeros(self.num_actions)
|
|
for a in range(self.num_actions):
|
|
# Sum EFE for all policies starting with action a
|
|
policy_indices = [i for i, p in enumerate(policies) if p[0] == a]
|
|
action_posterior[a] = np.sum(self._softmax(-total_efe)[policy_indices])
|
|
|
|
# Normalize action posterior
|
|
action_posterior = self._normalize(action_posterior)
|
|
|
|
return {
|
|
'policies': policies,
|
|
'ambiguity': ambiguity,
|
|
'risk': risk,
|
|
'expected_preferences': expected_preferences,
|
|
'total_efe': total_efe,
|
|
'action_posterior': action_posterior
|
|
}
|
|
|
|
def _compute_entropy(self, dist: np.ndarray) -> float:
|
|
"""Compute entropy of a distribution with numerical stability.
|
|
|
|
Args:
|
|
dist: Probability distribution
|
|
|
|
Returns:
|
|
Entropy value
|
|
"""
|
|
# Ensure valid probabilities
|
|
dist = np.maximum(dist, self.stability_threshold)
|
|
dist = dist / (dist.sum() + self.stability_threshold)
|
|
|
|
# Compute entropy with clipping to prevent log(0)
|
|
return -np.sum(dist * np.log(dist + self.stability_threshold))
|
|
|
|
def test_action_selection(self):
|
|
"""Test that action selection uses policy evaluation properly."""
|
|
# Run action selection multiple times
|
|
n_samples = 100
|
|
selected_actions = []
|
|
action_probs_list = []
|
|
|
|
for _ in range(n_samples):
|
|
# Get EFE components to compute action values
|
|
efe_components = self.get_efe_components()
|
|
action_values = -efe_components['total_efe'] # Negative because we want to minimize EFE
|
|
|
|
# Get action probabilities
|
|
action_probs = self._softmax(-action_values / (0.00001 + self.stability_threshold))
|
|
action = np.random.choice(len(action_probs), p=action_probs)
|
|
|
|
selected_actions.append(action)
|
|
action_probs_list.append(action_probs)
|
|
|
|
# Check that actions are selected with appropriate probabilities
|
|
action_counts = np.bincount(selected_actions, minlength=self.num_actions)
|
|
action_frequencies = action_counts / n_samples
|
|
|
|
# Average action probabilities across samples
|
|
avg_action_probs = np.mean(action_probs_list, axis=0)
|
|
|
|
# Check that empirical frequencies roughly match predicted probabilities
|
|
assert np.allclose(action_frequencies, avg_action_probs, atol=0.1)
|
|
|
|
return action, action_probs |