cognitive/Things/Generic_POMDP/generic_pomdp.py
Daniel Ari Friedman 6caa1a7cb1 Update
2025-02-07 08:16:25 -08:00

707 строки
29 KiB
Python

"""
Generic POMDP Implementation.
This module implements a generic Partially Observable Markov Decision Process (POMDP)
using Active Inference principles. The implementation is designed to be flexible and
handle arbitrary discrete observation and state spaces.
Key Components:
- A matrix: Observation model P(o|s) mapping states to observations
- B matrix: Transition model P(s'|s,a) defining state dynamics under actions
- C matrix: Preference matrix over observations across time
- D matrix: Prior beliefs over initial states
- E matrix: Prior preferences over policies
"""
import numpy as np
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import logging
import itertools
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class NumpyEncoder(json.JSONEncoder):
"""JSON encoder that handles numpy types."""
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.bool_):
return bool(obj)
return super().default(obj)
@dataclass
class ModelState:
"""Class to hold the current state of the POMDP."""
beliefs: np.ndarray
time_step: int = 0
current_state: Optional[int] = None
history: Dict[str, List] = field(default_factory=lambda: {
'observations': [],
'actions': [],
'beliefs': [],
'free_energy': [],
'efe_components': [] # Store EFE components for visualization
})
def __post_init__(self):
"""Validate state after initialization."""
if len(self.beliefs.shape) != 1:
raise ValueError("Beliefs must be a 1D array")
if not np.allclose(self.beliefs.sum(), 1.0):
raise ValueError("Beliefs must sum to 1")
if not np.all(self.beliefs >= 0):
raise ValueError("Beliefs must be non-negative")
def update(self, observation: int, action: Optional[int], free_energy: float,
efe_components: Optional[Dict] = None) -> None:
"""Update state and history with new information."""
if action is not None:
self.history['actions'].append(int(action))
self.history['observations'].append(int(observation))
self.history['beliefs'].append(self.beliefs.copy())
self.history['free_energy'].append(float(free_energy))
if efe_components is not None:
self.history['efe_components'].append(efe_components)
self.time_step += 1
class GenericPOMDP:
"""Generic POMDP implementation using Active Inference.
This class implements a Partially Observable Markov Decision Process using
Active Inference principles. It maintains beliefs over hidden states and
updates them based on observations and actions.
Key Features:
- Belief updating using variational inference
- Action selection using Expected Free Energy minimization
- Temporal preference learning
- Numerical stability handling
- State/history saving and loading
Matrix Dimensions:
- A: (num_observations, num_states) - Column stochastic
- B: (num_states, num_states, num_actions) - Column stochastic per action
- C: (num_observations, planning_horizon) - Preference values
- D: (num_states,) - Initial belief distribution
- E: (num_actions,) - Policy prior distribution
Attributes:
num_observations (int): Number of possible observations
num_states (int): Number of hidden states
num_actions (int): Number of possible actions
planning_horizon (int): Number of timesteps to plan ahead
max_policies (int): Maximum number of policies to evaluate
temperature (float): Temperature parameter for action selection
learning_rate (float): Learning rate for belief updates
stability_threshold (float): Small constant for numerical stability
state (ModelState): Current state of the POMDP
"""
def __init__(
self,
num_observations: int,
num_states: int,
num_actions: int,
planning_horizon: int = 4,
max_policies: int = 1000,
temperature: float = 1.0,
learning_rate: float = 0.1,
stability_threshold: float = 1e-16
):
"""Initialize the POMDP.
Args:
num_observations: Number of possible observations
num_states: Number of hidden states
num_actions: Number of possible actions
planning_horizon: Number of timesteps to plan ahead (default: 4)
max_policies: Maximum number of policies to evaluate
temperature: Temperature parameter for action selection
Higher values lead to more exploration
learning_rate: Learning rate for belief updates
Higher values lead to faster but potentially less stable updates
stability_threshold: Small constant for numerical stability
Used to prevent division by zero and log(0)
Raises:
ValueError: If any dimension is not positive
"""
# Validate inputs
if any(x <= 0 for x in [num_observations, num_states, num_actions]):
raise ValueError("Dimensions must be positive integers")
self.num_observations = num_observations
self.num_states = num_states
self.num_actions = num_actions
self.planning_horizon = planning_horizon
self.max_policies = max_policies
self.temperature = temperature
self.learning_rate = learning_rate
self.stability_threshold = stability_threshold
# Initialize matrices
self._initialize_matrices()
# Initialize state
self.state = ModelState(
current_state=0,
beliefs=self.D.copy(),
time_step=0
)
self.state.history['beliefs'].append(self.state.beliefs.copy())
def _initialize_matrices(self):
"""Initialize the model matrices with reasonable defaults."""
# A matrix: Observation model P(o|s)
self.A = np.random.rand(self.num_observations, self.num_states)
self.A = self._normalize(self.A) # Column stochastic
# B matrix: Transition model P(s'|s,a)
self.B = np.zeros((self.num_states, self.num_states, self.num_actions))
for a in range(self.num_actions):
B_a = np.eye(self.num_states) * 0.8 # Identity-based with high self-transition
B_a += np.random.rand(self.num_states, self.num_states) * 0.2 # Add random transitions
self.B[:,:,a] = self._normalize(B_a) # Column stochastic
# C matrix: Preferences over observations (fixed through time)
self.C = np.zeros((self.num_observations, self.planning_horizon)) # Initialize with zeros
# Set strong preference for observation 0
base_preferences = np.zeros(self.num_observations)
base_preferences[0] = 2.0 # Strong positive preference for observation 0
base_preferences[1:] = -0.5 # Slight negative preference for other observations
for t in range(self.planning_horizon):
self.C[:, t] = base_preferences
# D matrix: Prior beliefs over states
self.D = np.ones(self.num_states) / self.num_states # Uniform prior
# E matrix: Prior over policies
self.E = np.ones(self.num_actions) / self.num_actions # Uniform prior
def _normalize(self, x: np.ndarray) -> np.ndarray:
"""Normalize array to sum to 1 along first axis with numerical stability.
Args:
x: Array to normalize
Returns:
Normalized array
"""
# Handle negative values
x = np.maximum(x, self.stability_threshold)
if x.ndim == 1:
# For 1D arrays (e.g. beliefs)
normalizer = x.sum() + self.stability_threshold
if normalizer > self.stability_threshold:
return x / normalizer
return np.ones_like(x) / len(x)
else:
# For 2D arrays (e.g. matrices)
normalizer = x.sum(axis=0, keepdims=True) + self.stability_threshold
zero_cols = (normalizer < self.stability_threshold).flatten()
if np.any(zero_cols):
x[:, zero_cols] = 1.0 / x.shape[0]
return x / normalizer
def _compute_free_energy(self, beliefs: np.ndarray) -> float:
"""Compute variational free energy with numerical stability."""
# Expected log-likelihood
pred_obs = self.A @ beliefs # Shape: (num_observations,)
pred_obs = np.maximum(pred_obs, self.stability_threshold)
# Compute log likelihood using the predicted observations
log_likelihood = np.log(pred_obs + self.stability_threshold) # Shape: (num_observations,)
expected_ll = np.sum(pred_obs * log_likelihood) # Scalar
# KL divergence from prior
beliefs = np.maximum(beliefs, self.stability_threshold)
prior = np.maximum(self.D, self.stability_threshold)
kl_div = np.sum(beliefs * (np.log(beliefs + self.stability_threshold) - np.log(prior + self.stability_threshold)))
return -expected_ll + kl_div
def _update_beliefs(self, observation: int, action: int) -> np.ndarray:
"""Update beliefs using variational inference with momentum and adaptive learning rate.
This method implements belief updating using a gradient-based approach with
several numerical stability improvements:
1. Momentum to help escape local minima
2. Adaptive learning rate based on belief certainty
3. Numerical stability thresholds
4. Convergence checking
Args:
observation: The observed state
action: The action taken
Returns:
Updated beliefs over states
Implementation Details:
- Uses Bayes rule with iterative optimization
- Includes momentum term for better convergence
- Adapts learning rate based on belief certainty
- Ensures numerical stability throughout
- Monitors convergence with early stopping
"""
# Get likelihood and transition matrices
likelihood = self.A[observation, :] # Observation likelihood
transition = self.B[:, :, action] # State transition matrix
# Compute predicted beliefs (prior)
predicted_beliefs = transition @ self.state.beliefs
predicted_beliefs = self._normalize(predicted_beliefs)
# Initialize variables for iterative update
current_beliefs = predicted_beliefs.copy()
prev_delta = np.zeros_like(current_beliefs)
momentum = 0.9
max_iters = 50
min_delta = 1e-6
base_lr = 0.1
for _ in range(max_iters):
# Store old beliefs for convergence check
old_beliefs = current_beliefs.copy()
# Compute posterior using Bayes rule
posterior = likelihood * current_beliefs
posterior = self._normalize(posterior)
# Compute gradient
gradient = posterior - current_beliefs
# Adaptive learning rate based on belief certainty
certainty = 1.0 - self._compute_entropy(current_beliefs) / np.log(len(current_beliefs))
lr = base_lr * (1.0 - 0.9 * certainty)
# Update with momentum
delta = lr * gradient + momentum * prev_delta
current_beliefs += delta
prev_delta = delta
# Normalize
current_beliefs = self._normalize(current_beliefs)
# Check convergence
if np.max(np.abs(current_beliefs - old_beliefs)) < min_delta:
break
return current_beliefs
def _compute_expected_free_energy(
self,
beliefs: np.ndarray,
action: Optional[int] = None,
timestep: Optional[int] = None
) -> Union[float, Dict[str, np.ndarray]]:
"""Compute expected free energy components.
Args:
beliefs: Current belief state.
action: Optional specific action to compute EFE for.
timestep: Optional timestep for temporal discounting.
Returns:
If action is provided: float representing total EFE for that action
If action is None: Dictionary containing EFE components for all actions
"""
# If computing for a single action, return scalar total EFE
if action is not None:
# Get next state distribution
next_beliefs = self.B[:, :, action] @ beliefs
# Get predicted observations
predicted_obs = self.A @ next_beliefs
# Calculate ambiguity (negative entropy of beliefs)
# Use stability threshold for consistency
ambiguity = -np.sum(next_beliefs * np.log(next_beliefs + self.stability_threshold))
# Calculate risk (KL divergence from predicted to preferred outcomes)
risk = 0.00001 * np.sum(predicted_obs * np.log((predicted_obs + self.stability_threshold) / (predicted_obs + self.stability_threshold)))
# Calculate expected preferences with proper timestep wrapping
t = 0 if timestep is None else min(timestep, self.C.shape[1] - 1)
expected_preferences = 2.0 * np.sum(predicted_obs * self.C[:, t]) # Increased preference weight
# Return total EFE as scalar
return ambiguity + risk - expected_preferences
# Otherwise compute components for all actions
num_actions = self.num_actions
ambiguity = np.zeros(num_actions)
risk = np.zeros(num_actions)
expected_preferences = np.zeros(num_actions)
total_efe = np.zeros(num_actions)
for a in range(num_actions):
# Get next state distribution
next_beliefs = self.B[:, :, a] @ beliefs
# Get predicted observations
predicted_obs = self.A @ next_beliefs
# Calculate ambiguity (negative entropy of beliefs)
ambiguity[a] = -np.sum(next_beliefs * np.log(next_beliefs + self.stability_threshold))
# Calculate risk (KL divergence from predicted to preferred outcomes)
risk[a] = 0.00001 * np.sum(predicted_obs * np.log((predicted_obs + self.stability_threshold) / (predicted_obs + self.stability_threshold)))
# Calculate expected preferences with proper timestep wrapping
t = 0 if timestep is None else min(timestep, self.C.shape[1] - 1)
expected_preferences[a] = 2.0 * np.sum(predicted_obs * self.C[:, t]) # Increased preference weight
# Calculate total EFE
total_efe[a] = ambiguity[a] + risk[a] - expected_preferences[a]
return {
'ambiguity': ambiguity,
'risk': risk,
'expected_preferences': expected_preferences,
'total_efe': total_efe
}
def _select_action(self, action_values):
"""Select action using softmax with very low temperature."""
# Use extremely low temperature to make selection more deterministic
action_probs = self._softmax(-action_values / (0.000001 + self.stability_threshold))
return np.random.choice(len(action_probs), p=action_probs)
def _softmax(self, x: np.ndarray) -> np.ndarray:
"""Compute softmax probabilities with numerical stability.
Args:
x: Input array
Returns:
Softmax probabilities
"""
# Clip values to prevent overflow/underflow
x = np.clip(x, -500, 500)
# Shift for numerical stability
x_shifted = x - np.max(x)
# Compute exponentials
exp_x = np.exp(x_shifted)
# Normalize with stability threshold
return exp_x / (exp_x.sum() + self.stability_threshold)
def _generate_observation(self) -> int:
"""Generate observation based on current beliefs."""
# Expected observation distribution
obs_probs = self.A @ self.state.beliefs
obs_probs = self._normalize(obs_probs) # Ensure valid distribution
# Sample observation
return np.random.choice(self.num_observations, p=obs_probs)
def step(self, action: Optional[int] = None) -> Tuple[int, float]:
"""Take a step in the environment.
Args:
action: Optional action to take. If None, select action based on policy.
Returns:
Tuple of (observation, free energy).
"""
# Select action if not provided
if action is None:
# Get EFE components and policy probabilities
components = self.get_efe_components(self.state.beliefs, self.state.time_step)
action_posterior = components['action_posterior']
# Sample action from posterior
action = np.random.choice(self.num_actions, p=action_posterior)
# Get next state distribution
next_beliefs = self.B[:, :, action] @ self.state.beliefs
# Get observation probabilities
obs_probs = self.A @ next_beliefs
# Sample observation
observation = np.random.choice(self.num_observations, p=obs_probs)
# Update beliefs
updated_beliefs = self._update_beliefs(observation, action)
# Update state
self.state.beliefs = updated_beliefs
self.state.time_step += 1
# Calculate free energy for monitoring
efe_components = self._compute_expected_free_energy(updated_beliefs, action)
# Handle both scalar and dictionary return types
if isinstance(efe_components, dict):
free_energy = np.min(efe_components['total_efe'])
else:
free_energy = efe_components
# Update history
if 'observations' not in self.state.history:
self.state.history['observations'] = []
if 'actions' not in self.state.history:
self.state.history['actions'] = []
if 'free_energy' not in self.state.history:
self.state.history['free_energy'] = []
if 'beliefs' not in self.state.history:
self.state.history['beliefs'] = []
self.state.history['observations'].append(observation)
self.state.history['actions'].append(action)
self.state.history['free_energy'].append(free_energy)
self.state.history['beliefs'].append(updated_beliefs.copy())
return observation, free_energy
def save_state(self, path: Union[str, Path]) -> None:
"""Save current state to file.
Args:
path: Path to save state to
"""
path = Path(path)
state_dict = {
'beliefs': self.state.beliefs.tolist(),
'time_step': int(self.state.time_step),
'current_state': None if self.state.current_state is None else int(self.state.current_state),
'history': {
'observations': [int(x) for x in self.state.history['observations']],
'actions': [None if x is None else int(x) for x in self.state.history['actions']],
'beliefs': [b.tolist() for b in self.state.history['beliefs']],
'free_energy': [float(x) for x in self.state.history['free_energy']],
'efe_components': [
{k: v.tolist() if isinstance(v, np.ndarray) else
([x.tolist() if isinstance(x, np.ndarray) else x for x in v] if isinstance(v, list) else v)
for k, v in comp.items()}
for comp in self.state.history['efe_components']
]
}
}
with open(path, 'w') as f:
json.dump(state_dict, f, indent=2, cls=NumpyEncoder)
def load_state(self, path: Union[str, Path]) -> None:
"""Load state from file.
Args:
path: Path to load state from
"""
path = Path(path)
with open(path, 'r') as f:
state_dict = json.load(f)
# Create new state with loaded beliefs
self.state = ModelState(
beliefs=np.array(state_dict['beliefs']),
time_step=int(state_dict['time_step']),
current_state=None if state_dict['current_state'] is None else int(state_dict['current_state'])
)
# Load history with proper type conversion
self.state.history = {
'observations': [int(x) for x in state_dict['history']['observations']],
'actions': [None if x is None else int(x) for x in state_dict['history']['actions']],
'beliefs': [np.array(b) for b in state_dict['history']['beliefs']],
'free_energy': [float(x) for x in state_dict['history']['free_energy']],
'efe_components': [
{k: (np.array(v) if isinstance(v, list) and not isinstance(v[0], list)
else [np.array(x) if isinstance(x, list) else x for x in v] if isinstance(v, list)
else v)
for k, v in comp.items()}
for comp in state_dict['history']['efe_components']
]
}
def reset(self) -> None:
"""Reset the model to initial state."""
self.state = ModelState(
current_state=0,
beliefs=self.D.copy(),
time_step=0
)
self.state.history['beliefs'].append(self.state.beliefs.copy())
def _compute_kl_divergence(self, p: np.ndarray, q: np.ndarray) -> float:
"""Compute KL divergence between two distributions.
Args:
p: First distribution
q: Second distribution
Returns:
KL divergence value
"""
# Add stability threshold to avoid log(0)
p = p + self.stability_threshold
q = q + self.stability_threshold
# Normalize
p = p / p.sum()
q = q / q.sum()
# Compute KL divergence
return np.sum(p * (np.log(p) - np.log(q)))
def get_efe_components(
self,
beliefs: Optional[np.ndarray] = None,
timestep: Optional[int] = None
) -> Dict[str, Union[np.ndarray, List[List[int]]]]:
"""Get expected free energy components for all policies.
This method computes the Expected Free Energy (EFE) components for all possible
policies up to the planning horizon. The EFE is composed of:
1. Ambiguity: Entropy of beliefs (epistemic value)
2. Risk: KL divergence between predicted and preferred observations
3. Expected Preferences: Expected value of future observations
The total EFE is used to compute action probabilities through softmax.
Args:
beliefs: Optional beliefs to use instead of current state
timestep: Optional timestep for temporal discounting
Returns:
Dictionary containing:
- policies: List of action sequences
- ambiguity: Epistemic value term
- risk: Risk term from KL divergence
- expected_preferences: Expected preference satisfaction
- total_efe: Sum of components
- action_posterior: Action selection probabilities
Implementation Details:
- Generates all possible policies up to planning horizon
- Computes EFE components for each policy
- Uses temporal discounting for future preferences
- Ensures numerical stability in all computations
- Normalizes distributions appropriately
"""
if beliefs is None:
beliefs = self.state.beliefs
if timestep is None:
timestep = self.state.time_step
# Generate policies as lists for proper temporal discounting
policies = [[a] for a in range(self.num_actions)]
for _ in range(1, self.planning_horizon):
new_policies = []
for policy in policies:
for a in range(self.num_actions):
new_policies.append(policy + [a])
policies = new_policies
num_policies = len(policies)
ambiguity = np.zeros((num_policies, self.planning_horizon))
risk = np.zeros((num_policies, self.planning_horizon))
expected_preferences = np.zeros((num_policies, self.planning_horizon))
total_efe = np.zeros(num_policies)
# Compute components for each policy
for p, policy in enumerate(policies):
curr_beliefs = beliefs.copy()
for t, action in enumerate(policy):
# Get next state distribution
next_beliefs = self.B[:, :, action] @ curr_beliefs
# Get predicted observations
pred_obs = self.A @ next_beliefs
# Compute ambiguity (entropy of beliefs)
ambiguity[p, t] = -np.sum(next_beliefs * np.log(next_beliefs + self.stability_threshold))
# Compute risk (KL between predicted and preferred outcomes)
preferred_obs = self._softmax(self.C[:, t] if t < self.C.shape[1] else self.C[:, -1])
risk[p, t] = self._compute_kl_divergence(pred_obs, preferred_obs)
# Store expected preferences
expected_preferences[p, t] = np.sum(pred_obs * self.C[:, t if t < self.C.shape[1] else -1])
# Update beliefs for next timestep
curr_beliefs = next_beliefs
# Compute total EFE for policy with temporal discounting
total_efe[p] = np.sum(ambiguity[p, :] + risk[p, :] - expected_preferences[p, :])
# Compute action posterior using softmax
action_posterior = np.zeros(self.num_actions)
for a in range(self.num_actions):
# Sum EFE for all policies starting with action a
policy_indices = [i for i, p in enumerate(policies) if p[0] == a]
action_posterior[a] = np.sum(self._softmax(-total_efe)[policy_indices])
# Normalize action posterior
action_posterior = self._normalize(action_posterior)
return {
'policies': policies,
'ambiguity': ambiguity,
'risk': risk,
'expected_preferences': expected_preferences,
'total_efe': total_efe,
'action_posterior': action_posterior
}
def _compute_entropy(self, dist: np.ndarray) -> float:
"""Compute entropy of a distribution with numerical stability.
Args:
dist: Probability distribution
Returns:
Entropy value
"""
# Ensure valid probabilities
dist = np.maximum(dist, self.stability_threshold)
dist = dist / (dist.sum() + self.stability_threshold)
# Compute entropy with clipping to prevent log(0)
return -np.sum(dist * np.log(dist + self.stability_threshold))
def test_action_selection(self):
"""Test that action selection uses policy evaluation properly."""
# Run action selection multiple times
n_samples = 100
selected_actions = []
action_probs_list = []
for _ in range(n_samples):
# Get EFE components to compute action values
efe_components = self.get_efe_components()
action_values = -efe_components['total_efe'] # Negative because we want to minimize EFE
# Get action probabilities
action_probs = self._softmax(-action_values / (0.00001 + self.stability_threshold))
action = np.random.choice(len(action_probs), p=action_probs)
selected_actions.append(action)
action_probs_list.append(action_probs)
# Check that actions are selected with appropriate probabilities
action_counts = np.bincount(selected_actions, minlength=self.num_actions)
action_frequencies = action_counts / n_samples
# Average action probabilities across samples
avg_action_probs = np.mean(action_probs_list, axis=0)
# Check that empirical frequencies roughly match predicted probabilities
assert np.allclose(action_frequencies, avg_action_probs, atol=0.1)
return action, action_probs