cognitive/Things/Generic_POMDP/test_generic_pomdp.py
Daniel Ari Friedman 6caa1a7cb1 Update
2025-02-07 08:16:25 -08:00

1194 строки
49 KiB
Python

"""
Tests for Generic POMDP implementation.
"""
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
import pytest
import numpy as np
import yaml
import logging
import sys
from pathlib import Path
from datetime import datetime
from generic_pomdp import GenericPOMDP, ModelState
from matplotlib.gridspec import GridSpec
# Configure comprehensive logging
def setup_logging():
"""Configure logging to both file and console with detailed formatting."""
# Create logs directory if it doesn't exist
log_dir = Path(__file__).parent / "Output/logs"
log_dir.mkdir(parents=True, exist_ok=True)
# Create timestamped log file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = log_dir / f"test_run_{timestamp}.log"
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Detailed formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# File handler with full output
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Console handler with color output
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
return log_file
# Setup logging before tests run
log_file = setup_logging()
logger = logging.getLogger(__name__)
@pytest.fixture
def config():
"""Load configuration file."""
config_path = Path(__file__).parent / "configuration.yaml"
with open(config_path, 'r') as f:
return yaml.safe_load(f)
@pytest.fixture
def configured_pomdp(config):
"""Create POMDP from configuration file."""
dims = config['dimensions']
return GenericPOMDP(
num_observations=dims['observations'],
num_states=dims['states'],
num_actions=dims['actions'],
planning_horizon=dims['planning_horizon']
)
@pytest.fixture
def small_pomdp():
"""Create small POMDP for testing."""
return GenericPOMDP(
num_observations=2,
num_states=3,
num_actions=2,
planning_horizon=4 # Updated to match configuration
)
@pytest.fixture
def medium_pomdp():
"""Create medium POMDP for testing."""
return GenericPOMDP(
num_observations=4,
num_states=5,
num_actions=3,
planning_horizon=4 # Updated to match configuration
)
@pytest.fixture(scope="session", autouse=True)
def log_global_test_info(request):
"""Log global test information at start and end of test session."""
logger.info("=" * 80)
logger.info("Starting test session")
logger.info(f"Log file: {log_file}")
logger.info(f"Python version: {sys.version}")
logger.info("=" * 80)
yield
logger.info("=" * 80)
logger.info("Test session completed")
logger.info("=" * 80)
@pytest.fixture(scope="session")
def output_dir():
"""Create and return output directory structure."""
base_dir = Path(__file__).parent / "Output"
dirs = {
"logs": base_dir / "logs",
"plots": base_dir / "plots",
"test_results": base_dir / "test_results",
"simulations": base_dir / "simulations"
}
# Create all directories
for dir_path in dirs.values():
dir_path.mkdir(parents=True, exist_ok=True)
return dirs
def test_configuration_dimensions(config):
"""Test configuration file dimensions are valid."""
dims = config['dimensions']
logger.info(f"Testing configuration dimensions:")
logger.info(f"Observations: {dims['observations']}")
logger.info(f"States: {dims['states']}")
logger.info(f"Actions: {dims['actions']}")
logger.info(f"Planning horizon: {dims['planning_horizon']}")
logger.info(f"Total timesteps: {dims['total_timesteps']}")
# Validate dimensions are positive integers
assert dims['observations'] > 0, "Number of observations must be positive"
assert dims['states'] > 0, "Number of states must be positive"
assert dims['actions'] > 0, "Number of actions must be positive"
assert dims['planning_horizon'] > 0, "Planning horizon must be positive"
assert dims['total_timesteps'] > 0, "Total timesteps must be positive"
logger.info("All configuration dimensions validated successfully")
def test_initialization(configured_pomdp, config):
"""Test model initialization and dimensionality."""
dims = config['dimensions']
logger.info("Checking model dimensions match configuration:")
# Check dimensions
assert configured_pomdp.num_observations == dims['observations'], \
f"Expected {dims['observations']} observations, got {configured_pomdp.num_observations}"
logger.info(f"✓ Observations dimension: {configured_pomdp.num_observations}")
assert configured_pomdp.num_states == dims['states'], \
f"Expected {dims['states']} states, got {configured_pomdp.num_states}"
logger.info(f"✓ States dimension: {configured_pomdp.num_states}")
assert configured_pomdp.num_actions == dims['actions'], \
f"Expected {dims['actions']} actions, got {configured_pomdp.num_actions}"
logger.info(f"✓ Actions dimension: {configured_pomdp.num_actions}")
# Check matrix shapes
logger.info("\nValidating matrix shapes:")
A_shape = configured_pomdp.A.shape
expected_A = (dims['observations'], dims['states'])
assert A_shape == expected_A, f"A matrix shape mismatch. Expected {expected_A}, got {A_shape}"
logger.info(f"✓ A matrix shape: {A_shape}")
B_shape = configured_pomdp.B.shape
expected_B = (dims['states'], dims['states'], dims['actions'])
assert B_shape == expected_B, f"B matrix shape mismatch. Expected {expected_B}, got {B_shape}"
logger.info(f"✓ B matrix shape: {B_shape}")
C_shape = configured_pomdp.C.shape
expected_C = (dims['observations'], dims['planning_horizon'])
assert C_shape == expected_C, f"C matrix shape mismatch. Expected {expected_C}, got {C_shape}"
logger.info(f"✓ C matrix shape: {C_shape}")
D_shape = configured_pomdp.D.shape
expected_D = (dims['states'],)
assert D_shape == expected_D, f"D matrix shape mismatch. Expected {expected_D}, got {D_shape}"
logger.info(f"✓ D matrix shape: {D_shape}")
E_shape = configured_pomdp.E.shape
expected_E = (dims['actions'],)
assert E_shape == expected_E, f"E matrix shape mismatch. Expected {expected_E}, got {E_shape}"
logger.info(f"✓ E matrix shape: {E_shape}")
# Check initial state
assert isinstance(configured_pomdp.state, ModelState)
assert configured_pomdp.state.beliefs.shape == (dims['states'],)
assert configured_pomdp.state.time_step == 0
logger.info("\n✓ Initial state validated successfully")
logger.info("\nAll initialization checks passed successfully")
def test_matrix_properties(small_pomdp):
"""Test matrix properties."""
# A matrix (observation model)
assert np.allclose(small_pomdp.A.sum(axis=0), 1.0) # Column stochastic
assert np.all(small_pomdp.A >= 0) # Non-negative
# B matrix (transition model)
for a in range(small_pomdp.num_actions):
assert np.allclose(small_pomdp.B[:,:,a].sum(axis=0), 1.0) # Column stochastic
assert np.all(small_pomdp.B[:,:,a] >= 0) # Non-negative
# C matrix (preferences)
assert np.all(np.isfinite(small_pomdp.C)) # Finite values
# D matrix (prior beliefs)
assert np.allclose(small_pomdp.D.sum(), 1.0) # Normalized
assert np.all(small_pomdp.D >= 0) # Non-negative
# E matrix (policy prior)
assert np.allclose(small_pomdp.E.sum(), 1.0) # Normalized
assert np.all(small_pomdp.E >= 0) # Non-negative
def test_step_without_action(small_pomdp):
"""Test stepping without providing action."""
observation, free_energy = small_pomdp.step()
# Check observation
assert 0 <= observation < small_pomdp.num_observations
# Check free energy
assert np.isfinite(free_energy)
# Check history updated
assert len(small_pomdp.state.history['observations']) == 1
assert len(small_pomdp.state.history['actions']) == 1
assert len(small_pomdp.state.history['beliefs']) == 2 # Initial + updated
assert len(small_pomdp.state.history['free_energy']) == 1
def test_step_with_action(small_pomdp):
"""Test stepping with provided action."""
action = 0
observation, free_energy = small_pomdp.step(action)
# Check observation
assert 0 <= observation < small_pomdp.num_observations
# Check free energy
assert np.isfinite(free_energy)
# Check correct action taken
assert small_pomdp.state.history['actions'][-1] == action
def test_belief_updating(small_pomdp):
"""Test belief updating mechanism."""
initial_beliefs = small_pomdp.state.beliefs.copy()
# Take step
small_pomdp.step()
# Check beliefs updated
assert not np.allclose(small_pomdp.state.beliefs, initial_beliefs)
assert np.allclose(small_pomdp.state.beliefs.sum(), 1.0) # Still normalized
assert np.all(small_pomdp.state.beliefs >= 0) # Still non-negative
def test_action_selection(small_pomdp):
"""Test that action selection uses policy evaluation properly."""
# Run action selection multiple times
n_samples = 100
selected_actions = []
action_probs_list = []
for _ in range(n_samples):
# Get EFE components to compute action values
efe_components = small_pomdp.get_efe_components()
policies = efe_components['policies']
total_efe = efe_components['total_efe']
# Group policies by their first action and take the minimum EFE for each first action
action_values = np.full(small_pomdp.num_actions, np.inf)
for action_idx in range(small_pomdp.num_actions):
# Find policies that start with this action
matching_policies = [i for i, p in enumerate(policies) if p[0] == action_idx]
if matching_policies:
# Take the minimum EFE among policies starting with this action
action_values[action_idx] = np.min(total_efe[matching_policies])
# Get action probabilities
action_probs = small_pomdp._softmax(-action_values / (0.001 + small_pomdp.stability_threshold))
action = np.random.choice(len(action_probs), p=action_probs)
selected_actions.append(action)
action_probs_list.append(action_probs)
# Convert to numpy array for easier analysis
selected_actions = np.array(selected_actions)
# Compute empirical action frequencies
action_counts = np.bincount(selected_actions, minlength=small_pomdp.num_actions)
empirical_frequencies = action_counts / n_samples
# Average action probabilities across samples
avg_action_probs = np.mean(action_probs_list, axis=0)
# Check that empirical frequencies roughly match expected probabilities
np.testing.assert_allclose(empirical_frequencies, avg_action_probs, atol=0.1)
# Additional assertions to verify action selection properties
assert len(selected_actions) == n_samples, "Wrong number of selected actions"
assert all(0 <= a < small_pomdp.num_actions for a in selected_actions), "Invalid action selected"
assert all(np.isclose(sum(probs), 1.0) for probs in action_probs_list), "Action probabilities don't sum to 1"
def test_save_load_state(small_pomdp, tmp_path):
"""Test state saving and loading."""
# Take some steps
for _ in range(3):
small_pomdp.step()
# Save state
save_path = tmp_path / "test_state.yaml"
small_pomdp.save_state(save_path)
# Create new model
new_model = GenericPOMDP(
num_observations=2,
num_states=3,
num_actions=2,
planning_horizon=4 # Updated to match configuration
)
# Load state
new_model.load_state(save_path)
# Check state loaded correctly
assert new_model.state.current_state == small_pomdp.state.current_state
assert np.allclose(new_model.state.beliefs, small_pomdp.state.beliefs)
assert new_model.state.time_step == small_pomdp.state.time_step
# Check history loaded correctly
for key in small_pomdp.state.history:
assert len(new_model.state.history[key]) == len(small_pomdp.state.history[key])
for i in range(len(small_pomdp.state.history[key])):
if isinstance(small_pomdp.state.history[key][i], dict):
# For efe_components dictionaries
for k, v in small_pomdp.state.history[key][i].items():
if isinstance(v, (np.ndarray, list)):
assert np.allclose(np.array(new_model.state.history[key][i][k]), np.array(v))
else:
assert new_model.state.history[key][i][k] == v
elif isinstance(small_pomdp.state.history[key][i], (np.ndarray, list)):
assert np.allclose(np.array(new_model.state.history[key][i]), np.array(small_pomdp.state.history[key][i]))
else:
assert new_model.state.history[key][i] == small_pomdp.state.history[key][i]
def test_numerical_stability(small_pomdp):
"""Test numerical stability of computations."""
# Set very small beliefs
small_pomdp.state.beliefs = np.ones(small_pomdp.num_states) * 1e-10
small_pomdp.state.beliefs /= small_pomdp.state.beliefs.sum()
# Should not raise and give finite results
observation, free_energy = small_pomdp.step()
assert np.isfinite(free_energy)
assert np.all(np.isfinite(small_pomdp.state.beliefs))
def test_learning_dynamics(medium_pomdp):
"""Test learning dynamics over multiple steps."""
# Track belief entropy, beliefs and free energy
entropies = []
beliefs_history = []
free_energies = []
# Run for multiple steps
n_steps = 50 # Increased number of steps
for _ in range(n_steps):
obs, fe = medium_pomdp.step()
beliefs = medium_pomdp.state.beliefs.copy()
beliefs_history.append(beliefs)
entropy = -np.sum(beliefs * np.log(beliefs + medium_pomdp.stability_threshold))
entropies.append(entropy)
free_energies.append(fe)
# Test 1: Entropy should stay within reasonable bounds
initial_entropy = -np.sum(np.ones(medium_pomdp.num_states)/medium_pomdp.num_states *
np.log(np.ones(medium_pomdp.num_states)/medium_pomdp.num_states))
assert all(0 <= e <= initial_entropy + 0.5 for e in entropies), "Entropy outside reasonable bounds"
# Test 2: Beliefs should change over time (learning is happening)
belief_changes = [np.linalg.norm(b2 - b1) for b1, b2 in zip(beliefs_history[:-1], beliefs_history[1:])]
assert np.mean(belief_changes) > 0.001, "Beliefs not changing significantly"
# Test 3: Beliefs should remain valid probability distributions
for beliefs in beliefs_history:
assert np.allclose(beliefs.sum(), 1.0), "Beliefs not normalized"
assert np.all(beliefs >= 0), "Negative belief values found"
# Test 4: Free energy should show evidence of learning
# Compare average free energy in first and last 10 steps
early_fe = np.mean(free_energies[:10])
late_fe = np.mean(free_energies[-10:])
assert late_fe <= early_fe + 2.0, "Free energy not showing evidence of learning"
# Test 5: Belief changes should show temporal structure
# Look at the sequence of changes in 5-step windows
window_size = 5
window_changes = []
for i in range(0, len(belief_changes) - window_size, window_size):
window = belief_changes[i:i+window_size]
window_changes.append(np.mean(window))
# Test for a general decreasing trend in window changes
# Allow for more exploration by requiring only 30% of consecutive pairs to show decrease
decreasing_pairs = sum(w1 > w2 for w1, w2 in zip(window_changes[:-1], window_changes[1:]))
total_pairs = len(window_changes) - 1
assert decreasing_pairs / total_pairs >= 0.3, "Belief changes don't show consistent learning trend"
def test_preference_influence(small_pomdp):
"""Test influence of preferences on action selection."""
# Set up transition dynamics to make observation 0 achievable
# Make action 0 lead to states that generate observation 0
small_pomdp.B[:,:,0] = 0.0
small_pomdp.B[0,:,0] = 1.0 # Action 0 leads to state 0
# Make action 1 lead to states that generate observation 1
small_pomdp.B[:,:,1] = 0.0
small_pomdp.B[1,:,1] = 1.0 # Action 1 leads to state 1
# Set up observation model
small_pomdp.A = np.zeros((small_pomdp.num_observations, small_pomdp.num_states))
small_pomdp.A[0,0] = 1.0 # State 0 generates observation 0
small_pomdp.A[1,1] = 1.0 # State 1 generates observation 1
small_pomdp.A[1,2] = 1.0 # State 2 generates observation 1
# Set very strong preference for observation 0
small_pomdp.C[0,:] = 5.0
small_pomdp.C[1,:] = -5.0
# Run for multiple steps
n_steps = 50
observations = []
for _ in range(n_steps):
obs, _ = small_pomdp.step()
observations.append(obs)
# Should see more preferred observations
obs_counts = np.bincount(observations)
logger.info(f"Observation counts: {obs_counts}")
# Require at least 60% preferred observations
assert obs_counts[0] > 0.6 * n_steps, \
f"Expected at least 60% observations of type 0, got {obs_counts[0]/n_steps*100:.1f}%"
def test_invalid_dimensions():
"""Test handling of invalid dimensions."""
with pytest.raises(ValueError):
GenericPOMDP(
num_observations=0, # Invalid
num_states=3,
num_actions=2
)
def test_invalid_action(small_pomdp):
"""Test handling of invalid action."""
with pytest.raises(IndexError):
small_pomdp.step(action=small_pomdp.num_actions) # Invalid action
def test_convergence(small_pomdp):
"""Test convergence properties."""
# Set deterministic transition and observation
small_pomdp.B[:,:,0] = np.eye(small_pomdp.num_states) # Identity transitions
# Create proper observation matrix that maps states to observations
small_pomdp.A = np.zeros((small_pomdp.num_observations, small_pomdp.num_states))
# Map each state to a unique observation if possible, otherwise map multiple states to same observation
for i in range(small_pomdp.num_states):
small_pomdp.A[i % small_pomdp.num_observations, i] = 1.0
# Normalize columns to ensure proper probability distribution
small_pomdp.A /= small_pomdp.A.sum(axis=0, keepdims=True)
# Run for multiple steps with same action
beliefs_history = []
for _ in range(10):
small_pomdp.step(action=0)
beliefs_history.append(small_pomdp.state.beliefs.copy())
# Should converge (later beliefs should be more similar)
diffs = [np.linalg.norm(b1 - b2)
for b1, b2 in zip(beliefs_history[:-1], beliefs_history[1:])]
assert np.mean(diffs[:3]) > np.mean(diffs[-3:])
def create_matrix_visualizations(pomdp, plots_dir):
"""Create and save visualizations for all matrices."""
# Individual matrix plots
# A matrix
plt.figure(figsize=(8, 6))
plt.imshow(pomdp.A, cmap='viridis', aspect='auto')
plt.colorbar(label='Probability')
plt.title('A Matrix (Observation Model)')
plt.xlabel('States')
plt.ylabel('Observations')
for i in range(pomdp.A.shape[0]):
for j in range(pomdp.A.shape[1]):
plt.text(j, i, f'{pomdp.A[i,j]:.2f}', ha='center', va='center',
color='white' if pomdp.A[i,j] > 0.5 else 'black')
plt.savefig(plots_dir / 'A_matrix.png', dpi=300, bbox_inches='tight')
plt.close()
# B matrix (one subplot per action)
n_actions = pomdp.B.shape[2]
fig, axes = plt.subplots(1, n_actions, figsize=(6*n_actions, 5))
if n_actions == 1:
axes = [axes]
for a in range(n_actions):
im = axes[a].imshow(pomdp.B[:,:,a], cmap='viridis', aspect='auto')
axes[a].set_title(f'B Matrix (Action {a})')
axes[a].set_xlabel('States t')
axes[a].set_ylabel('States t+1')
plt.colorbar(im, ax=axes[a], label='Probability')
for i in range(pomdp.B.shape[0]):
for j in range(pomdp.B.shape[1]):
axes[a].text(j, i, f'{pomdp.B[i,j,a]:.2f}', ha='center', va='center',
color='white' if pomdp.B[i,j,a] > 0.5 else 'black')
plt.tight_layout()
plt.savefig(plots_dir / 'B_matrix.png', dpi=300, bbox_inches='tight')
plt.close()
# C matrix
plt.figure(figsize=(10, 6))
vmax = max(abs(pomdp.C.min()), abs(pomdp.C.max()))
vmin = -vmax
plt.imshow(pomdp.C, cmap='RdBu_r', aspect='auto', vmin=vmin, vmax=vmax)
plt.colorbar(label='Preference')
plt.title('C Matrix (Preferences)')
plt.xlabel('Time Steps')
plt.ylabel('Observations')
for i in range(pomdp.C.shape[0]):
for j in range(pomdp.C.shape[1]):
plt.text(j, i, f'{pomdp.C[i,j]:.2f}', ha='center', va='center',
color='white' if abs(pomdp.C[i,j]) > vmax/2 else 'black')
plt.savefig(plots_dir / 'C_matrix.png', dpi=300, bbox_inches='tight')
plt.close()
# D matrix
plt.figure(figsize=(6, 4))
bars = plt.bar(range(len(pomdp.D)), pomdp.D)
plt.title('D Matrix (Prior Beliefs)')
plt.xlabel('States')
plt.ylabel('Probability')
for bar, v in zip(bars, pomdp.D):
plt.text(bar.get_x() + bar.get_width()/2, v, f'{v:.2f}',
ha='center', va='bottom')
plt.grid(True, alpha=0.3)
plt.savefig(plots_dir / 'D_matrix.png', dpi=300, bbox_inches='tight')
plt.close()
# E matrix
plt.figure(figsize=(6, 4))
bars = plt.bar(range(len(pomdp.E)), pomdp.E)
plt.title('E Matrix (Policy Prior)')
plt.xlabel('Actions')
plt.ylabel('Probability')
for bar, v in zip(bars, pomdp.E):
plt.text(bar.get_x() + bar.get_width()/2, v, f'{v:.2f}',
ha='center', va='bottom')
plt.grid(True, alpha=0.3)
plt.savefig(plots_dir / 'E_matrix.png', dpi=300, bbox_inches='tight')
plt.close()
# Combined overview plot
fig = plt.figure(figsize=(15, 12))
gs = GridSpec(3, 3, figure=fig)
# A matrix
ax1 = fig.add_subplot(gs[0, 0])
im1 = ax1.imshow(pomdp.A, cmap='viridis', aspect='auto')
ax1.set_title('A Matrix\n(Observation Model)')
plt.colorbar(im1, ax=ax1)
ax1.set_xlabel('States')
ax1.set_ylabel('Observations')
# B matrix (first action)
ax2 = fig.add_subplot(gs[0, 1:])
im2 = ax2.imshow(pomdp.B[:,:,0], cmap='viridis', aspect='auto')
ax2.set_title('B Matrix\n(Transition Model, Action 0)')
plt.colorbar(im2, ax=ax2)
ax2.set_xlabel('States t')
ax2.set_ylabel('States t+1')
# C matrix
ax3 = fig.add_subplot(gs[1, :])
vmax = max(abs(pomdp.C.min()), abs(pomdp.C.max()))
vmin = -vmax
im3 = ax3.imshow(pomdp.C, cmap='RdBu_r', aspect='auto', vmin=vmin, vmax=vmax)
ax3.set_title('C Matrix (Preferences)')
plt.colorbar(im3, ax=ax3)
ax3.set_xlabel('Time Steps')
ax3.set_ylabel('Observations')
# D matrix
ax4 = fig.add_subplot(gs[2, 0:2])
ax4.bar(range(len(pomdp.D)), pomdp.D)
ax4.set_title('D Matrix (Prior Beliefs)')
ax4.grid(True, alpha=0.3)
ax4.set_xlabel('States')
ax4.set_ylabel('Probability')
# E matrix
ax5 = fig.add_subplot(gs[2, 2])
ax5.bar(range(len(pomdp.E)), pomdp.E)
ax5.set_title('E Matrix (Policy Prior)')
ax5.grid(True, alpha=0.3)
ax5.set_xlabel('Actions')
ax5.set_ylabel('Probability')
plt.tight_layout()
plt.savefig(plots_dir / 'matrix_overview.png', dpi=300, bbox_inches='tight')
plt.close()
def test_full_simulation(configured_pomdp, config, output_dir):
"""Run a full simulation with comprehensive logging."""
try:
logger.info("\nStarting full simulation test")
# Setup output paths
test_dir = output_dir["test_results"] / "full_simulation"
test_dir.mkdir(exist_ok=True)
plots_dir = output_dir["plots"]
# Create EFE components directory
efe_dir = plots_dir / "efe_components"
efe_dir.mkdir(exist_ok=True)
# Log paths
logger.info(f"\nOutput directories:")
logger.info(f"Test results: {test_dir}")
logger.info(f"Plots: {plots_dir}")
logger.info(f"EFE Components: {efe_dir}")
# Log initial configuration
logger.info("\nModel Configuration:")
logger.info(f"Observations: {configured_pomdp.num_observations}")
logger.info(f"States: {configured_pomdp.num_states}")
logger.info(f"Actions: {configured_pomdp.num_actions}")
logger.info(f"Planning horizon: {config['dimensions']['planning_horizon']}")
logger.info(f"Total timesteps: {config['dimensions']['total_timesteps']}")
# Create matrix visualizations before simulation
create_matrix_visualizations(configured_pomdp, plots_dir)
# Run simulation
n_steps = config['dimensions']['total_timesteps']
planning_horizon = config['dimensions']['planning_horizon']
logger.info(f"\nRunning simulation for {n_steps} steps")
history = {
'observations': [],
'actions': [],
'free_energies': [],
'beliefs': [],
'efe_components': [] # Store EFE components for each policy at each timestep
}
# Store initial beliefs
history['beliefs'].append(configured_pomdp.state.beliefs.copy())
# Run simulation with progress tracking
for step in range(n_steps):
logger.info(f"\nStep {step + 1}/{n_steps}")
# Get EFE components before taking step
efe_components = configured_pomdp.get_efe_components() # This method needs to be implemented in GenericPOMDP
history['efe_components'].append(efe_components)
# Take step
obs, fe = configured_pomdp.step()
# Log step results
logger.info(f"Observation: {obs}")
logger.info(f"Free Energy: {fe:.4f}")
logger.info(f"Updated beliefs: {configured_pomdp.state.beliefs}")
# Store history
history['observations'].append(obs)
history['actions'].append(configured_pomdp.state.history['actions'][-1])
history['free_energies'].append(fe)
history['beliefs'].append(configured_pomdp.state.beliefs.copy())
# Create EFE component visualization for this timestep
visualize_efe_components(efe_components, step, efe_dir, planning_horizon)
# Generate and save visualizations
# Belief evolution plot
plt.figure(figsize=(10, 6))
beliefs = np.array(history['beliefs'])
for i in range(configured_pomdp.num_states):
plt.plot(range(len(beliefs)), beliefs[:, i],
label=f'State {i}', linewidth=2, alpha=0.7)
plt.xlabel('Time Step')
plt.ylabel('Belief Probability')
plt.title('Belief Evolution Over Time')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(plots_dir / 'belief_evolution.png', dpi=300, bbox_inches='tight')
plt.close()
# Free energy plot
plt.figure(figsize=(10, 6))
plt.plot(range(len(history['free_energies'])), history['free_energies'],
'b-', linewidth=2, alpha=0.7)
plt.xlabel('Time Step')
plt.ylabel('Free Energy')
plt.title('Free Energy Over Time')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(plots_dir / 'free_energy.png', dpi=300, bbox_inches='tight')
plt.close()
# Action distribution plot
plt.figure(figsize=(10, 6))
actions = np.array(history['actions'])
action_counts = np.bincount(actions, minlength=configured_pomdp.num_actions)
bars = plt.bar(range(configured_pomdp.num_actions), action_counts)
plt.xlabel('Action')
plt.ylabel('Count')
plt.title('Action Distribution')
for bar, count in zip(bars, action_counts):
plt.text(bar.get_x() + bar.get_width()/2, count, str(count),
ha='center', va='bottom')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(plots_dir / 'action_distribution.png', dpi=300, bbox_inches='tight')
plt.close()
# Log summary statistics
logger.info("\nSimulation Summary:")
logger.info(f"Average Free Energy: {np.mean(history['free_energies']):.4f}")
logger.info(f"Observation distribution: {np.bincount(history['observations'])}")
logger.info(f"Action distribution: {np.bincount(history['actions'])}")
# Calculate belief entropy evolution
belief_entropies = [-np.sum(b * np.log(b + 1e-12)) for b in history['beliefs']]
logger.info(f"Initial belief entropy: {belief_entropies[0]:.4f}")
logger.info(f"Final belief entropy: {belief_entropies[-1]:.4f}")
# Log generated files
logger.info("\nGenerated visualization files:")
for plot_file in plots_dir.glob("*.png"):
logger.info(f"- {plot_file.name}")
# Additional assertions for matrix properties
# Test A matrix properties
assert np.allclose(configured_pomdp.A.sum(axis=0), 1.0), "A matrix columns should sum to 1"
assert np.all(configured_pomdp.A >= 0), "A matrix should be non-negative"
# Test B matrix properties
for a in range(configured_pomdp.num_actions):
assert np.allclose(configured_pomdp.B[:,:,a].sum(axis=0), 1.0), \
f"B matrix for action {a} columns should sum to 1"
assert np.all(configured_pomdp.B[:,:,a] >= 0), \
f"B matrix for action {a} should be non-negative"
# Test D matrix properties
assert np.allclose(configured_pomdp.D.sum(), 1.0), "D matrix should sum to 1"
assert np.all(configured_pomdp.D >= 0), "D matrix should be non-negative"
# Test E matrix properties
assert np.allclose(configured_pomdp.E.sum(), 1.0), "E matrix should sum to 1"
assert np.all(configured_pomdp.E >= 0), "E matrix should be non-negative"
# Check that all visualization files exist
expected_files = [
'belief_evolution.png',
'free_energy.png',
'action_distribution.png',
'A_matrix.png',
'B_matrix.png',
'C_matrix.png',
'D_matrix.png',
'E_matrix.png',
'matrix_overview.png'
]
for filename in expected_files:
assert (plots_dir / filename).exists(), f"Expected visualization file {filename} not found"
logger.info("\nFull simulation test completed successfully")
except Exception as e:
logger.error(f"\nError in full simulation test: {str(e)}")
raise
def visualize_efe_components(efe_components, timestep, efe_dir, planning_horizon):
"""Create visualizations for EFE components at each timestep."""
# Extract components
ambiguity = efe_components['ambiguity']
risk = efe_components['risk']
expected_preferences = efe_components['expected_preferences']
total_efe = efe_components['total_efe']
# Create figure with subplots
fig = plt.figure(figsize=(15, 10))
gs = GridSpec(2, 2, figure=fig)
# Plot ambiguity
ax1 = fig.add_subplot(gs[0, 0])
im1 = ax1.imshow(ambiguity.reshape(-1, planning_horizon), cmap='viridis', aspect='auto')
ax1.set_title('Ambiguity')
ax1.set_xlabel('Time Step')
ax1.set_ylabel('Policy')
plt.colorbar(im1, ax=ax1)
# Plot risk
ax2 = fig.add_subplot(gs[0, 1])
im2 = ax2.imshow(risk.reshape(-1, planning_horizon), cmap='viridis', aspect='auto')
ax2.set_title('Risk')
ax2.set_xlabel('Time Step')
ax2.set_ylabel('Policy')
plt.colorbar(im2, ax=ax2)
# Plot expected preferences with centered colormap
ax3 = fig.add_subplot(gs[1, 0])
vmax = np.max(np.abs(expected_preferences))
vmin = -vmax
im3 = ax3.imshow(expected_preferences.reshape(-1, planning_horizon),
cmap='RdBu_r', aspect='auto', vmin=vmin, vmax=vmax)
ax3.set_title('Expected Preferences')
ax3.set_xlabel('Time Step')
ax3.set_ylabel('Policy')
plt.colorbar(im3, ax=ax3)
# Plot total EFE as bar chart
ax4 = fig.add_subplot(gs[1, 1])
policies = range(len(total_efe))
ax4.bar(policies, total_efe)
ax4.set_title('Total Expected Free Energy')
ax4.set_xlabel('Policy')
ax4.set_ylabel('EFE')
# Adjust layout and save
plt.tight_layout()
plt.savefig(efe_dir / f'efe_components_t{timestep:03d}.png', dpi=300, bbox_inches='tight')
plt.close()
# Create policy comparison visualization
fig, ax = plt.subplots(figsize=(10, 6))
x = np.arange(len(total_efe))
width = 0.25
# Plot grouped bar chart
ax.bar(x - width, ambiguity.mean(axis=1), width, label='Ambiguity', color='blue', alpha=0.7)
ax.bar(x, risk.mean(axis=1), width, label='Risk', color='red', alpha=0.7)
ax.bar(x + width, expected_preferences.mean(axis=1), width, label='Expected Preferences', color='green', alpha=0.7)
ax.set_xlabel('Policy')
ax.set_ylabel('Component Value')
ax.set_title('EFE Components by Policy')
ax.legend()
plt.tight_layout()
plt.savefig(efe_dir / f'policy_comparison_t{timestep:03d}.png', dpi=300, bbox_inches='tight')
plt.close()
def test_full_configured_simulation(configured_pomdp, config):
"""Run a full simulation with the configured time horizon."""
logger.info("\nStarting full configured simulation test")
# Get total timesteps from config
T = config['dimensions']['total_timesteps']
logger.info(f"\nRunning simulation for {T} timesteps as configured")
# Log initial configuration
logger.info("\nModel Configuration:")
logger.info(f"Observations: {configured_pomdp.num_observations}")
logger.info(f"States: {configured_pomdp.num_states}")
logger.info(f"Actions: {configured_pomdp.num_actions}")
logger.info(f"Planning horizon: {config['dimensions']['planning_horizon']}")
logger.info(f"Total timesteps: {T}")
# Initialize history tracking
history = {
'observations': [],
'actions': [],
'free_energies': [],
'beliefs': [],
'efe_components': [] # Track EFE components instead of preferences
}
# Store initial state
history['beliefs'].append(configured_pomdp.state.beliefs.copy())
# Run simulation for T timesteps
for step in range(T):
logger.info(f"\nTimestep {step + 1}/{T}")
# Get EFE components before taking step
efe_components = configured_pomdp.get_efe_components()
history['efe_components'].append(efe_components)
# Take step and record results
obs, fe = configured_pomdp.step()
# Log step results
logger.info(f"Observation: {obs}")
logger.info(f"Free Energy: {fe:.4f}")
logger.info(f"Updated beliefs: {configured_pomdp.state.beliefs}")
# Store history
history['observations'].append(obs)
history['actions'].append(configured_pomdp.state.history['actions'][-1])
history['free_energies'].append(fe)
history['beliefs'].append(configured_pomdp.state.beliefs.copy())
# Calculate and log summary statistics
logger.info("\nSimulation Summary:")
logger.info(f"Average Free Energy: {np.mean(history['free_energies']):.4f}")
# Observation statistics
obs_counts = np.bincount(history['observations'], minlength=configured_pomdp.num_observations)
logger.info(f"Observation distribution: {obs_counts}")
logger.info(f"Most frequent observation: {np.argmax(obs_counts)} (count: {np.max(obs_counts)})")
# Action statistics
action_counts = np.bincount(history['actions'], minlength=configured_pomdp.num_actions)
logger.info(f"Action distribution: {action_counts}")
logger.info(f"Most frequent action: {np.argmax(action_counts)} (count: {np.max(action_counts)})")
# Belief evolution
belief_entropies = [-np.sum(b * np.log(b + 1e-12)) for b in history['beliefs']]
logger.info(f"Initial belief entropy: {belief_entropies[0]:.4f}")
logger.info(f"Final belief entropy: {belief_entropies[-1]:.4f}")
logger.info(f"Belief entropy reduction: {belief_entropies[0] - belief_entropies[-1]:.4f}")
# EFE component analysis
avg_ambiguity = np.mean([comp['ambiguity'].mean() for comp in history['efe_components']])
avg_risk = np.mean([comp['risk'].mean() for comp in history['efe_components']])
avg_preferences = np.mean([comp['expected_preferences'].mean() for comp in history['efe_components']])
logger.info("\nEFE Component Analysis:")
logger.info(f"Average Ambiguity: {avg_ambiguity:.4f}")
logger.info(f"Average Risk: {avg_risk:.4f}")
logger.info(f"Average Expected Preferences: {avg_preferences:.4f}")
# Validate results
assert len(history['observations']) == T, f"Expected {T} observations"
assert len(history['actions']) == T, f"Expected {T} actions"
assert len(history['free_energies']) == T, f"Expected {T} free energy values"
assert all(np.isfinite(fe) for fe in history['free_energies']), "All free energies should be finite"
assert all(0 <= obs < configured_pomdp.num_observations
for obs in history['observations']), "All observations should be valid"
logger.info("\nFull configured simulation completed successfully")
def test_matrix_numerical_properties(configured_pomdp):
"""Test numerical properties of matrices."""
# Test condition numbers
A_cond = np.linalg.cond(configured_pomdp.A)
assert A_cond < 1e6, f"A matrix condition number too high: {A_cond}"
for a in range(configured_pomdp.num_actions):
B_cond = np.linalg.cond(configured_pomdp.B[:,:,a])
assert B_cond < 1e6, f"B matrix condition number too high for action {a}: {B_cond}"
# Test for NaN and Inf values
assert not np.any(np.isnan(configured_pomdp.A)), "A matrix contains NaN values"
assert not np.any(np.isinf(configured_pomdp.A)), "A matrix contains Inf values"
assert not np.any(np.isnan(configured_pomdp.B)), "B matrix contains NaN values"
assert not np.any(np.isinf(configured_pomdp.B)), "B matrix contains Inf values"
assert not np.any(np.isnan(configured_pomdp.C)), "C matrix contains NaN values"
assert not np.any(np.isinf(configured_pomdp.C)), "C matrix contains Inf values"
assert not np.any(np.isnan(configured_pomdp.D)), "D matrix contains NaN values"
assert not np.any(np.isinf(configured_pomdp.D)), "D matrix contains Inf values"
assert not np.any(np.isnan(configured_pomdp.E)), "E matrix contains NaN values"
assert not np.any(np.isinf(configured_pomdp.E)), "E matrix contains Inf values"
def test_matrix_consistency(configured_pomdp):
"""Test consistency of matrix dimensions and properties."""
# Test dimension consistency
assert configured_pomdp.A.shape == (configured_pomdp.num_observations, configured_pomdp.num_states), \
"A matrix dimensions inconsistent"
assert configured_pomdp.B.shape == (configured_pomdp.num_states, configured_pomdp.num_states,
configured_pomdp.num_actions), "B matrix dimensions inconsistent"
assert configured_pomdp.C.shape[0] == configured_pomdp.num_observations, \
"C matrix observation dimension inconsistent"
assert configured_pomdp.D.shape == (configured_pomdp.num_states,), \
"D matrix dimension inconsistent"
assert configured_pomdp.E.shape == (configured_pomdp.num_actions,), \
"E matrix dimension inconsistent"
# Test probability constraints
for a in range(configured_pomdp.num_actions):
# Each column in B should represent a valid transition probability distribution
assert np.all(np.abs(configured_pomdp.B[:,:,a].sum(axis=0) - 1.0) < 1e-10), \
f"B matrix for action {a} contains invalid probability distributions"
# Each column in A should represent a valid observation probability distribution
assert np.all(np.abs(configured_pomdp.A.sum(axis=0) - 1.0) < 1e-10), \
"A matrix contains invalid probability distributions"
def test_belief_update_stability(configured_pomdp):
"""Test numerical stability of belief updates under extreme conditions."""
# Test with very small beliefs
configured_pomdp.state.beliefs = np.ones(configured_pomdp.num_states) * 1e-10
configured_pomdp.state.beliefs /= configured_pomdp.state.beliefs.sum()
# Should not raise and give valid beliefs
obs, fe = configured_pomdp.step()
assert np.allclose(configured_pomdp.state.beliefs.sum(), 1.0), "Beliefs not normalized after update"
assert np.all(configured_pomdp.state.beliefs >= 0), "Negative beliefs after update"
assert np.all(np.isfinite(configured_pomdp.state.beliefs)), "Non-finite beliefs after update"
# Test with very concentrated beliefs
configured_pomdp.state.beliefs = np.zeros(configured_pomdp.num_states)
configured_pomdp.state.beliefs[0] = 1.0 - 1e-10
configured_pomdp.state.beliefs[1:] = 1e-10 / (configured_pomdp.num_states - 1)
# Should not raise and give valid beliefs
obs, fe = configured_pomdp.step()
assert np.allclose(configured_pomdp.state.beliefs.sum(), 1.0), "Beliefs not normalized after update"
assert np.all(configured_pomdp.state.beliefs >= 0), "Negative beliefs after update"
assert np.all(np.isfinite(configured_pomdp.state.beliefs)), "Non-finite beliefs after update"
def test_preference_learning(configured_pomdp):
"""Test learning with different preference configurations."""
# Store original matrices
original_C = configured_pomdp.C.copy()
original_B = configured_pomdp.B.copy()
original_A = configured_pomdp.A.copy()
try:
# Set very strong preference for first observation
configured_pomdp.C[0,:] = 5.0 # Strong positive preference for observation 0
configured_pomdp.C[1:,:] = -2.0 # Negative preference for other observations
# Modify transition dynamics to make observation 0 achievable
# Make first state more likely to generate observation 0
configured_pomdp.A[0,0] = 0.8 # High probability of observation 0 from state 0
configured_pomdp.A[1:,0] = 0.2 / (configured_pomdp.num_observations - 1) # Distribute remaining probability
# Normalize A matrix columns
configured_pomdp.A /= configured_pomdp.A.sum(axis=0, keepdims=True)
# Make action 0 lead to state 0 with high probability
for a in range(configured_pomdp.num_actions):
configured_pomdp.B[:,:,a] = np.eye(configured_pomdp.num_states) * 0.7
configured_pomdp.B[0,:,0] = 0.8 # Action 0 leads to state 0
configured_pomdp.B[1:,:,0] = 0.2 / (configured_pomdp.num_states - 1) # Distribute remaining probability
# Normalize B matrix columns
configured_pomdp.B[:,:,a] /= configured_pomdp.B[:,:,a].sum(axis=0, keepdims=True)
# Reset beliefs to uniform
configured_pomdp.state.beliefs = np.ones(configured_pomdp.num_states) / configured_pomdp.num_states
# Run simulation
n_steps = 50 # Increased number of steps
observations = []
for _ in range(n_steps):
obs, _ = configured_pomdp.step()
observations.append(obs)
# Check if preferred observations occur more frequently
obs_counts = np.bincount(observations, minlength=configured_pomdp.num_observations)
logger.info(f"Observation counts: {obs_counts}")
logger.info(f"Observation 0 count: {obs_counts[0]}")
logger.info(f"Mean of other observations: {np.mean(obs_counts[1:]):.2f}")
# Require at least 30% more observations of type 0 than average of others
assert obs_counts[0] > 1.3 * np.mean(obs_counts[1:]), \
f"Preferred observation not selected more frequently. Got {obs_counts[0]} vs mean {np.mean(obs_counts[1:]):.2f}"
finally:
# Restore original matrices
configured_pomdp.C = original_C
configured_pomdp.B = original_B
configured_pomdp.A = original_A
# Reset beliefs to uniform
configured_pomdp.state.beliefs = np.ones(configured_pomdp.num_states) / configured_pomdp.num_states
def test_policy_tree_generation(small_pomdp):
"""Test policy tree generation and pruning."""
# Get policy tree
components = small_pomdp.get_efe_components()
policies = components['policies']
# Check basic properties
assert isinstance(policies, list)
assert all(isinstance(p, list) for p in policies)
assert all(all(isinstance(a, int) for a in p) for p in policies)
assert all(0 <= a < small_pomdp.num_actions for p in policies for a in p)
# Check policy lengths
assert all(len(p) <= small_pomdp.planning_horizon for p in policies)
# Check number of policies is bounded
assert len(policies) <= small_pomdp.max_policies
# Check policy evaluation components
assert components['ambiguity'].shape == (len(policies), small_pomdp.planning_horizon)
assert components['risk'].shape == (len(policies), small_pomdp.planning_horizon)
assert components['expected_preferences'].shape == (len(policies), small_pomdp.planning_horizon)
assert components['total_efe'].shape == (len(policies),)
def test_temporal_preference_evaluation(small_pomdp):
"""Test evaluation of preferences over time."""
# Set time-varying preferences
small_pomdp.C = np.zeros((small_pomdp.num_observations, small_pomdp.planning_horizon))
small_pomdp.C[0, 0] = 1.0 # Prefer first observation initially
small_pomdp.C[1, -1] = 2.0 # Strongly prefer second observation later
# Get policy evaluation
components = small_pomdp.get_efe_components()
expected_preferences = components['expected_preferences']
# Policies should show preference for delayed reward
best_policy_idx = np.argmin(components['total_efe'])
best_policy = components['policies'][best_policy_idx]
# Check that later actions in best policy lead to preferred observation
final_beliefs = small_pomdp.state.beliefs.copy()
for action in best_policy:
final_beliefs = small_pomdp.B[:,:,action] @ final_beliefs
final_obs_probs = small_pomdp.A @ final_beliefs
# Should have high probability for second observation
assert final_obs_probs[1] > 0.3
def test_policy_pruning(medium_pomdp):
"""Test that policy pruning maintains diversity and performance."""
# Get full policy evaluation
components = medium_pomdp.get_efe_components()
policies = components['policies']
total_efe = components['total_efe']
# Check pruning keeps best policies
best_idx = np.argmin(total_efe)
best_efe = total_efe[best_idx]
# Run with smaller max_policies
medium_pomdp.max_policies = len(policies) // 2
pruned_components = medium_pomdp.get_efe_components()
pruned_total_efe = pruned_components['total_efe']
# Best pruned policy should be close to original best
best_pruned_efe = np.min(pruned_total_efe)
assert np.abs(best_pruned_efe - best_efe) < 1.0
# Check policy diversity (not all same first action)
first_actions = [p[0] for p in pruned_components['policies']]
assert len(set(first_actions)) > 1
def test_belief_propagation(small_pomdp):
"""Test belief propagation through policy evaluation."""
# Set specific initial beliefs
small_pomdp.state.beliefs = np.zeros(small_pomdp.num_states)
small_pomdp.state.beliefs[0] = 1.0
# Get policy evaluation
components = small_pomdp.get_efe_components()
policies = components['policies']
# Check belief propagation for first policy
test_policy = policies[0]
beliefs = small_pomdp.state.beliefs.copy()
for t, action in enumerate(test_policy):
# Manual belief propagation
beliefs = small_pomdp.B[:,:,action] @ beliefs
# Get predicted observations
pred_obs = small_pomdp.A @ beliefs
# Compare with stored components
policy_idx = 0 # Testing first policy
# Check ambiguity matches entropy of beliefs
expected_ambiguity = -np.sum(beliefs * np.log(beliefs + small_pomdp.stability_threshold))
assert np.abs(components['ambiguity'][policy_idx, t] - expected_ambiguity) < 1e-5
# Check risk computation
expected_risk = small_pomdp._compute_kl_divergence(
pred_obs,
small_pomdp._softmax(small_pomdp.C[:, t])
)
assert np.abs(components['risk'][policy_idx, t] - expected_risk) < 1e-5
if __name__ == '__main__':
# When run directly, execute pytest with full output
import sys
sys.exit(pytest.main([__file__, "-v", "--capture=tee-sys"]))