зеркало из
https://github.com/docxology/cognitive.git
synced 2025-10-30 12:46:04 +02:00
1194 строки
49 KiB
Python
1194 строки
49 KiB
Python
"""
|
|
Tests for Generic POMDP implementation.
|
|
"""
|
|
|
|
import matplotlib
|
|
matplotlib.use('Agg') # Use non-interactive backend
|
|
import matplotlib.pyplot as plt
|
|
import pytest
|
|
import numpy as np
|
|
import yaml
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from generic_pomdp import GenericPOMDP, ModelState
|
|
from matplotlib.gridspec import GridSpec
|
|
|
|
# Configure comprehensive logging
|
|
def setup_logging():
|
|
"""Configure logging to both file and console with detailed formatting."""
|
|
# Create logs directory if it doesn't exist
|
|
log_dir = Path(__file__).parent / "Output/logs"
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create timestamped log file
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
log_file = log_dir / f"test_run_{timestamp}.log"
|
|
|
|
# Configure root logger
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(logging.INFO)
|
|
|
|
# Detailed formatter
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# File handler with full output
|
|
file_handler = logging.FileHandler(log_file)
|
|
file_handler.setLevel(logging.INFO)
|
|
file_handler.setFormatter(formatter)
|
|
root_logger.addHandler(file_handler)
|
|
|
|
# Console handler with color output
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setLevel(logging.INFO)
|
|
console_handler.setFormatter(formatter)
|
|
root_logger.addHandler(console_handler)
|
|
|
|
return log_file
|
|
|
|
# Setup logging before tests run
|
|
log_file = setup_logging()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@pytest.fixture
|
|
def config():
|
|
"""Load configuration file."""
|
|
config_path = Path(__file__).parent / "configuration.yaml"
|
|
with open(config_path, 'r') as f:
|
|
return yaml.safe_load(f)
|
|
|
|
@pytest.fixture
|
|
def configured_pomdp(config):
|
|
"""Create POMDP from configuration file."""
|
|
dims = config['dimensions']
|
|
return GenericPOMDP(
|
|
num_observations=dims['observations'],
|
|
num_states=dims['states'],
|
|
num_actions=dims['actions'],
|
|
planning_horizon=dims['planning_horizon']
|
|
)
|
|
|
|
@pytest.fixture
|
|
def small_pomdp():
|
|
"""Create small POMDP for testing."""
|
|
return GenericPOMDP(
|
|
num_observations=2,
|
|
num_states=3,
|
|
num_actions=2,
|
|
planning_horizon=4 # Updated to match configuration
|
|
)
|
|
|
|
@pytest.fixture
|
|
def medium_pomdp():
|
|
"""Create medium POMDP for testing."""
|
|
return GenericPOMDP(
|
|
num_observations=4,
|
|
num_states=5,
|
|
num_actions=3,
|
|
planning_horizon=4 # Updated to match configuration
|
|
)
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def log_global_test_info(request):
|
|
"""Log global test information at start and end of test session."""
|
|
logger.info("=" * 80)
|
|
logger.info("Starting test session")
|
|
logger.info(f"Log file: {log_file}")
|
|
logger.info(f"Python version: {sys.version}")
|
|
logger.info("=" * 80)
|
|
|
|
yield
|
|
|
|
logger.info("=" * 80)
|
|
logger.info("Test session completed")
|
|
logger.info("=" * 80)
|
|
|
|
@pytest.fixture(scope="session")
|
|
def output_dir():
|
|
"""Create and return output directory structure."""
|
|
base_dir = Path(__file__).parent / "Output"
|
|
dirs = {
|
|
"logs": base_dir / "logs",
|
|
"plots": base_dir / "plots",
|
|
"test_results": base_dir / "test_results",
|
|
"simulations": base_dir / "simulations"
|
|
}
|
|
|
|
# Create all directories
|
|
for dir_path in dirs.values():
|
|
dir_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
return dirs
|
|
|
|
def test_configuration_dimensions(config):
|
|
"""Test configuration file dimensions are valid."""
|
|
dims = config['dimensions']
|
|
logger.info(f"Testing configuration dimensions:")
|
|
logger.info(f"Observations: {dims['observations']}")
|
|
logger.info(f"States: {dims['states']}")
|
|
logger.info(f"Actions: {dims['actions']}")
|
|
logger.info(f"Planning horizon: {dims['planning_horizon']}")
|
|
logger.info(f"Total timesteps: {dims['total_timesteps']}")
|
|
|
|
# Validate dimensions are positive integers
|
|
assert dims['observations'] > 0, "Number of observations must be positive"
|
|
assert dims['states'] > 0, "Number of states must be positive"
|
|
assert dims['actions'] > 0, "Number of actions must be positive"
|
|
assert dims['planning_horizon'] > 0, "Planning horizon must be positive"
|
|
assert dims['total_timesteps'] > 0, "Total timesteps must be positive"
|
|
|
|
logger.info("All configuration dimensions validated successfully")
|
|
|
|
def test_initialization(configured_pomdp, config):
|
|
"""Test model initialization and dimensionality."""
|
|
dims = config['dimensions']
|
|
|
|
logger.info("Checking model dimensions match configuration:")
|
|
|
|
# Check dimensions
|
|
assert configured_pomdp.num_observations == dims['observations'], \
|
|
f"Expected {dims['observations']} observations, got {configured_pomdp.num_observations}"
|
|
logger.info(f"✓ Observations dimension: {configured_pomdp.num_observations}")
|
|
|
|
assert configured_pomdp.num_states == dims['states'], \
|
|
f"Expected {dims['states']} states, got {configured_pomdp.num_states}"
|
|
logger.info(f"✓ States dimension: {configured_pomdp.num_states}")
|
|
|
|
assert configured_pomdp.num_actions == dims['actions'], \
|
|
f"Expected {dims['actions']} actions, got {configured_pomdp.num_actions}"
|
|
logger.info(f"✓ Actions dimension: {configured_pomdp.num_actions}")
|
|
|
|
# Check matrix shapes
|
|
logger.info("\nValidating matrix shapes:")
|
|
|
|
A_shape = configured_pomdp.A.shape
|
|
expected_A = (dims['observations'], dims['states'])
|
|
assert A_shape == expected_A, f"A matrix shape mismatch. Expected {expected_A}, got {A_shape}"
|
|
logger.info(f"✓ A matrix shape: {A_shape}")
|
|
|
|
B_shape = configured_pomdp.B.shape
|
|
expected_B = (dims['states'], dims['states'], dims['actions'])
|
|
assert B_shape == expected_B, f"B matrix shape mismatch. Expected {expected_B}, got {B_shape}"
|
|
logger.info(f"✓ B matrix shape: {B_shape}")
|
|
|
|
C_shape = configured_pomdp.C.shape
|
|
expected_C = (dims['observations'], dims['planning_horizon'])
|
|
assert C_shape == expected_C, f"C matrix shape mismatch. Expected {expected_C}, got {C_shape}"
|
|
logger.info(f"✓ C matrix shape: {C_shape}")
|
|
|
|
D_shape = configured_pomdp.D.shape
|
|
expected_D = (dims['states'],)
|
|
assert D_shape == expected_D, f"D matrix shape mismatch. Expected {expected_D}, got {D_shape}"
|
|
logger.info(f"✓ D matrix shape: {D_shape}")
|
|
|
|
E_shape = configured_pomdp.E.shape
|
|
expected_E = (dims['actions'],)
|
|
assert E_shape == expected_E, f"E matrix shape mismatch. Expected {expected_E}, got {E_shape}"
|
|
logger.info(f"✓ E matrix shape: {E_shape}")
|
|
|
|
# Check initial state
|
|
assert isinstance(configured_pomdp.state, ModelState)
|
|
assert configured_pomdp.state.beliefs.shape == (dims['states'],)
|
|
assert configured_pomdp.state.time_step == 0
|
|
logger.info("\n✓ Initial state validated successfully")
|
|
|
|
logger.info("\nAll initialization checks passed successfully")
|
|
|
|
def test_matrix_properties(small_pomdp):
|
|
"""Test matrix properties."""
|
|
# A matrix (observation model)
|
|
assert np.allclose(small_pomdp.A.sum(axis=0), 1.0) # Column stochastic
|
|
assert np.all(small_pomdp.A >= 0) # Non-negative
|
|
|
|
# B matrix (transition model)
|
|
for a in range(small_pomdp.num_actions):
|
|
assert np.allclose(small_pomdp.B[:,:,a].sum(axis=0), 1.0) # Column stochastic
|
|
assert np.all(small_pomdp.B[:,:,a] >= 0) # Non-negative
|
|
|
|
# C matrix (preferences)
|
|
assert np.all(np.isfinite(small_pomdp.C)) # Finite values
|
|
|
|
# D matrix (prior beliefs)
|
|
assert np.allclose(small_pomdp.D.sum(), 1.0) # Normalized
|
|
assert np.all(small_pomdp.D >= 0) # Non-negative
|
|
|
|
# E matrix (policy prior)
|
|
assert np.allclose(small_pomdp.E.sum(), 1.0) # Normalized
|
|
assert np.all(small_pomdp.E >= 0) # Non-negative
|
|
|
|
def test_step_without_action(small_pomdp):
|
|
"""Test stepping without providing action."""
|
|
observation, free_energy = small_pomdp.step()
|
|
|
|
# Check observation
|
|
assert 0 <= observation < small_pomdp.num_observations
|
|
|
|
# Check free energy
|
|
assert np.isfinite(free_energy)
|
|
|
|
# Check history updated
|
|
assert len(small_pomdp.state.history['observations']) == 1
|
|
assert len(small_pomdp.state.history['actions']) == 1
|
|
assert len(small_pomdp.state.history['beliefs']) == 2 # Initial + updated
|
|
assert len(small_pomdp.state.history['free_energy']) == 1
|
|
|
|
def test_step_with_action(small_pomdp):
|
|
"""Test stepping with provided action."""
|
|
action = 0
|
|
observation, free_energy = small_pomdp.step(action)
|
|
|
|
# Check observation
|
|
assert 0 <= observation < small_pomdp.num_observations
|
|
|
|
# Check free energy
|
|
assert np.isfinite(free_energy)
|
|
|
|
# Check correct action taken
|
|
assert small_pomdp.state.history['actions'][-1] == action
|
|
|
|
def test_belief_updating(small_pomdp):
|
|
"""Test belief updating mechanism."""
|
|
initial_beliefs = small_pomdp.state.beliefs.copy()
|
|
|
|
# Take step
|
|
small_pomdp.step()
|
|
|
|
# Check beliefs updated
|
|
assert not np.allclose(small_pomdp.state.beliefs, initial_beliefs)
|
|
assert np.allclose(small_pomdp.state.beliefs.sum(), 1.0) # Still normalized
|
|
assert np.all(small_pomdp.state.beliefs >= 0) # Still non-negative
|
|
|
|
def test_action_selection(small_pomdp):
|
|
"""Test that action selection uses policy evaluation properly."""
|
|
# Run action selection multiple times
|
|
n_samples = 100
|
|
selected_actions = []
|
|
action_probs_list = []
|
|
|
|
for _ in range(n_samples):
|
|
# Get EFE components to compute action values
|
|
efe_components = small_pomdp.get_efe_components()
|
|
policies = efe_components['policies']
|
|
total_efe = efe_components['total_efe']
|
|
|
|
# Group policies by their first action and take the minimum EFE for each first action
|
|
action_values = np.full(small_pomdp.num_actions, np.inf)
|
|
for action_idx in range(small_pomdp.num_actions):
|
|
# Find policies that start with this action
|
|
matching_policies = [i for i, p in enumerate(policies) if p[0] == action_idx]
|
|
if matching_policies:
|
|
# Take the minimum EFE among policies starting with this action
|
|
action_values[action_idx] = np.min(total_efe[matching_policies])
|
|
|
|
# Get action probabilities
|
|
action_probs = small_pomdp._softmax(-action_values / (0.001 + small_pomdp.stability_threshold))
|
|
action = np.random.choice(len(action_probs), p=action_probs)
|
|
|
|
selected_actions.append(action)
|
|
action_probs_list.append(action_probs)
|
|
|
|
# Convert to numpy array for easier analysis
|
|
selected_actions = np.array(selected_actions)
|
|
|
|
# Compute empirical action frequencies
|
|
action_counts = np.bincount(selected_actions, minlength=small_pomdp.num_actions)
|
|
empirical_frequencies = action_counts / n_samples
|
|
|
|
# Average action probabilities across samples
|
|
avg_action_probs = np.mean(action_probs_list, axis=0)
|
|
|
|
# Check that empirical frequencies roughly match expected probabilities
|
|
np.testing.assert_allclose(empirical_frequencies, avg_action_probs, atol=0.1)
|
|
|
|
# Additional assertions to verify action selection properties
|
|
assert len(selected_actions) == n_samples, "Wrong number of selected actions"
|
|
assert all(0 <= a < small_pomdp.num_actions for a in selected_actions), "Invalid action selected"
|
|
assert all(np.isclose(sum(probs), 1.0) for probs in action_probs_list), "Action probabilities don't sum to 1"
|
|
|
|
def test_save_load_state(small_pomdp, tmp_path):
|
|
"""Test state saving and loading."""
|
|
# Take some steps
|
|
for _ in range(3):
|
|
small_pomdp.step()
|
|
|
|
# Save state
|
|
save_path = tmp_path / "test_state.yaml"
|
|
small_pomdp.save_state(save_path)
|
|
|
|
# Create new model
|
|
new_model = GenericPOMDP(
|
|
num_observations=2,
|
|
num_states=3,
|
|
num_actions=2,
|
|
planning_horizon=4 # Updated to match configuration
|
|
)
|
|
|
|
# Load state
|
|
new_model.load_state(save_path)
|
|
|
|
# Check state loaded correctly
|
|
assert new_model.state.current_state == small_pomdp.state.current_state
|
|
assert np.allclose(new_model.state.beliefs, small_pomdp.state.beliefs)
|
|
assert new_model.state.time_step == small_pomdp.state.time_step
|
|
|
|
# Check history loaded correctly
|
|
for key in small_pomdp.state.history:
|
|
assert len(new_model.state.history[key]) == len(small_pomdp.state.history[key])
|
|
for i in range(len(small_pomdp.state.history[key])):
|
|
if isinstance(small_pomdp.state.history[key][i], dict):
|
|
# For efe_components dictionaries
|
|
for k, v in small_pomdp.state.history[key][i].items():
|
|
if isinstance(v, (np.ndarray, list)):
|
|
assert np.allclose(np.array(new_model.state.history[key][i][k]), np.array(v))
|
|
else:
|
|
assert new_model.state.history[key][i][k] == v
|
|
elif isinstance(small_pomdp.state.history[key][i], (np.ndarray, list)):
|
|
assert np.allclose(np.array(new_model.state.history[key][i]), np.array(small_pomdp.state.history[key][i]))
|
|
else:
|
|
assert new_model.state.history[key][i] == small_pomdp.state.history[key][i]
|
|
|
|
def test_numerical_stability(small_pomdp):
|
|
"""Test numerical stability of computations."""
|
|
# Set very small beliefs
|
|
small_pomdp.state.beliefs = np.ones(small_pomdp.num_states) * 1e-10
|
|
small_pomdp.state.beliefs /= small_pomdp.state.beliefs.sum()
|
|
|
|
# Should not raise and give finite results
|
|
observation, free_energy = small_pomdp.step()
|
|
assert np.isfinite(free_energy)
|
|
assert np.all(np.isfinite(small_pomdp.state.beliefs))
|
|
|
|
def test_learning_dynamics(medium_pomdp):
|
|
"""Test learning dynamics over multiple steps."""
|
|
# Track belief entropy, beliefs and free energy
|
|
entropies = []
|
|
beliefs_history = []
|
|
free_energies = []
|
|
|
|
# Run for multiple steps
|
|
n_steps = 50 # Increased number of steps
|
|
for _ in range(n_steps):
|
|
obs, fe = medium_pomdp.step()
|
|
beliefs = medium_pomdp.state.beliefs.copy()
|
|
beliefs_history.append(beliefs)
|
|
entropy = -np.sum(beliefs * np.log(beliefs + medium_pomdp.stability_threshold))
|
|
entropies.append(entropy)
|
|
free_energies.append(fe)
|
|
|
|
# Test 1: Entropy should stay within reasonable bounds
|
|
initial_entropy = -np.sum(np.ones(medium_pomdp.num_states)/medium_pomdp.num_states *
|
|
np.log(np.ones(medium_pomdp.num_states)/medium_pomdp.num_states))
|
|
assert all(0 <= e <= initial_entropy + 0.5 for e in entropies), "Entropy outside reasonable bounds"
|
|
|
|
# Test 2: Beliefs should change over time (learning is happening)
|
|
belief_changes = [np.linalg.norm(b2 - b1) for b1, b2 in zip(beliefs_history[:-1], beliefs_history[1:])]
|
|
assert np.mean(belief_changes) > 0.001, "Beliefs not changing significantly"
|
|
|
|
# Test 3: Beliefs should remain valid probability distributions
|
|
for beliefs in beliefs_history:
|
|
assert np.allclose(beliefs.sum(), 1.0), "Beliefs not normalized"
|
|
assert np.all(beliefs >= 0), "Negative belief values found"
|
|
|
|
# Test 4: Free energy should show evidence of learning
|
|
# Compare average free energy in first and last 10 steps
|
|
early_fe = np.mean(free_energies[:10])
|
|
late_fe = np.mean(free_energies[-10:])
|
|
assert late_fe <= early_fe + 2.0, "Free energy not showing evidence of learning"
|
|
|
|
# Test 5: Belief changes should show temporal structure
|
|
# Look at the sequence of changes in 5-step windows
|
|
window_size = 5
|
|
window_changes = []
|
|
for i in range(0, len(belief_changes) - window_size, window_size):
|
|
window = belief_changes[i:i+window_size]
|
|
window_changes.append(np.mean(window))
|
|
|
|
# Test for a general decreasing trend in window changes
|
|
# Allow for more exploration by requiring only 30% of consecutive pairs to show decrease
|
|
decreasing_pairs = sum(w1 > w2 for w1, w2 in zip(window_changes[:-1], window_changes[1:]))
|
|
total_pairs = len(window_changes) - 1
|
|
assert decreasing_pairs / total_pairs >= 0.3, "Belief changes don't show consistent learning trend"
|
|
|
|
def test_preference_influence(small_pomdp):
|
|
"""Test influence of preferences on action selection."""
|
|
# Set up transition dynamics to make observation 0 achievable
|
|
# Make action 0 lead to states that generate observation 0
|
|
small_pomdp.B[:,:,0] = 0.0
|
|
small_pomdp.B[0,:,0] = 1.0 # Action 0 leads to state 0
|
|
|
|
# Make action 1 lead to states that generate observation 1
|
|
small_pomdp.B[:,:,1] = 0.0
|
|
small_pomdp.B[1,:,1] = 1.0 # Action 1 leads to state 1
|
|
|
|
# Set up observation model
|
|
small_pomdp.A = np.zeros((small_pomdp.num_observations, small_pomdp.num_states))
|
|
small_pomdp.A[0,0] = 1.0 # State 0 generates observation 0
|
|
small_pomdp.A[1,1] = 1.0 # State 1 generates observation 1
|
|
small_pomdp.A[1,2] = 1.0 # State 2 generates observation 1
|
|
|
|
# Set very strong preference for observation 0
|
|
small_pomdp.C[0,:] = 5.0
|
|
small_pomdp.C[1,:] = -5.0
|
|
|
|
# Run for multiple steps
|
|
n_steps = 50
|
|
observations = []
|
|
for _ in range(n_steps):
|
|
obs, _ = small_pomdp.step()
|
|
observations.append(obs)
|
|
|
|
# Should see more preferred observations
|
|
obs_counts = np.bincount(observations)
|
|
logger.info(f"Observation counts: {obs_counts}")
|
|
# Require at least 60% preferred observations
|
|
assert obs_counts[0] > 0.6 * n_steps, \
|
|
f"Expected at least 60% observations of type 0, got {obs_counts[0]/n_steps*100:.1f}%"
|
|
|
|
def test_invalid_dimensions():
|
|
"""Test handling of invalid dimensions."""
|
|
with pytest.raises(ValueError):
|
|
GenericPOMDP(
|
|
num_observations=0, # Invalid
|
|
num_states=3,
|
|
num_actions=2
|
|
)
|
|
|
|
def test_invalid_action(small_pomdp):
|
|
"""Test handling of invalid action."""
|
|
with pytest.raises(IndexError):
|
|
small_pomdp.step(action=small_pomdp.num_actions) # Invalid action
|
|
|
|
def test_convergence(small_pomdp):
|
|
"""Test convergence properties."""
|
|
# Set deterministic transition and observation
|
|
small_pomdp.B[:,:,0] = np.eye(small_pomdp.num_states) # Identity transitions
|
|
|
|
# Create proper observation matrix that maps states to observations
|
|
small_pomdp.A = np.zeros((small_pomdp.num_observations, small_pomdp.num_states))
|
|
# Map each state to a unique observation if possible, otherwise map multiple states to same observation
|
|
for i in range(small_pomdp.num_states):
|
|
small_pomdp.A[i % small_pomdp.num_observations, i] = 1.0
|
|
|
|
# Normalize columns to ensure proper probability distribution
|
|
small_pomdp.A /= small_pomdp.A.sum(axis=0, keepdims=True)
|
|
|
|
# Run for multiple steps with same action
|
|
beliefs_history = []
|
|
for _ in range(10):
|
|
small_pomdp.step(action=0)
|
|
beliefs_history.append(small_pomdp.state.beliefs.copy())
|
|
|
|
# Should converge (later beliefs should be more similar)
|
|
diffs = [np.linalg.norm(b1 - b2)
|
|
for b1, b2 in zip(beliefs_history[:-1], beliefs_history[1:])]
|
|
assert np.mean(diffs[:3]) > np.mean(diffs[-3:])
|
|
|
|
def create_matrix_visualizations(pomdp, plots_dir):
|
|
"""Create and save visualizations for all matrices."""
|
|
# Individual matrix plots
|
|
|
|
# A matrix
|
|
plt.figure(figsize=(8, 6))
|
|
plt.imshow(pomdp.A, cmap='viridis', aspect='auto')
|
|
plt.colorbar(label='Probability')
|
|
plt.title('A Matrix (Observation Model)')
|
|
plt.xlabel('States')
|
|
plt.ylabel('Observations')
|
|
for i in range(pomdp.A.shape[0]):
|
|
for j in range(pomdp.A.shape[1]):
|
|
plt.text(j, i, f'{pomdp.A[i,j]:.2f}', ha='center', va='center',
|
|
color='white' if pomdp.A[i,j] > 0.5 else 'black')
|
|
plt.savefig(plots_dir / 'A_matrix.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
# B matrix (one subplot per action)
|
|
n_actions = pomdp.B.shape[2]
|
|
fig, axes = plt.subplots(1, n_actions, figsize=(6*n_actions, 5))
|
|
if n_actions == 1:
|
|
axes = [axes]
|
|
for a in range(n_actions):
|
|
im = axes[a].imshow(pomdp.B[:,:,a], cmap='viridis', aspect='auto')
|
|
axes[a].set_title(f'B Matrix (Action {a})')
|
|
axes[a].set_xlabel('States t')
|
|
axes[a].set_ylabel('States t+1')
|
|
plt.colorbar(im, ax=axes[a], label='Probability')
|
|
for i in range(pomdp.B.shape[0]):
|
|
for j in range(pomdp.B.shape[1]):
|
|
axes[a].text(j, i, f'{pomdp.B[i,j,a]:.2f}', ha='center', va='center',
|
|
color='white' if pomdp.B[i,j,a] > 0.5 else 'black')
|
|
plt.tight_layout()
|
|
plt.savefig(plots_dir / 'B_matrix.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
# C matrix
|
|
plt.figure(figsize=(10, 6))
|
|
vmax = max(abs(pomdp.C.min()), abs(pomdp.C.max()))
|
|
vmin = -vmax
|
|
plt.imshow(pomdp.C, cmap='RdBu_r', aspect='auto', vmin=vmin, vmax=vmax)
|
|
plt.colorbar(label='Preference')
|
|
plt.title('C Matrix (Preferences)')
|
|
plt.xlabel('Time Steps')
|
|
plt.ylabel('Observations')
|
|
for i in range(pomdp.C.shape[0]):
|
|
for j in range(pomdp.C.shape[1]):
|
|
plt.text(j, i, f'{pomdp.C[i,j]:.2f}', ha='center', va='center',
|
|
color='white' if abs(pomdp.C[i,j]) > vmax/2 else 'black')
|
|
plt.savefig(plots_dir / 'C_matrix.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
# D matrix
|
|
plt.figure(figsize=(6, 4))
|
|
bars = plt.bar(range(len(pomdp.D)), pomdp.D)
|
|
plt.title('D Matrix (Prior Beliefs)')
|
|
plt.xlabel('States')
|
|
plt.ylabel('Probability')
|
|
for bar, v in zip(bars, pomdp.D):
|
|
plt.text(bar.get_x() + bar.get_width()/2, v, f'{v:.2f}',
|
|
ha='center', va='bottom')
|
|
plt.grid(True, alpha=0.3)
|
|
plt.savefig(plots_dir / 'D_matrix.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
# E matrix
|
|
plt.figure(figsize=(6, 4))
|
|
bars = plt.bar(range(len(pomdp.E)), pomdp.E)
|
|
plt.title('E Matrix (Policy Prior)')
|
|
plt.xlabel('Actions')
|
|
plt.ylabel('Probability')
|
|
for bar, v in zip(bars, pomdp.E):
|
|
plt.text(bar.get_x() + bar.get_width()/2, v, f'{v:.2f}',
|
|
ha='center', va='bottom')
|
|
plt.grid(True, alpha=0.3)
|
|
plt.savefig(plots_dir / 'E_matrix.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
# Combined overview plot
|
|
fig = plt.figure(figsize=(15, 12))
|
|
gs = GridSpec(3, 3, figure=fig)
|
|
|
|
# A matrix
|
|
ax1 = fig.add_subplot(gs[0, 0])
|
|
im1 = ax1.imshow(pomdp.A, cmap='viridis', aspect='auto')
|
|
ax1.set_title('A Matrix\n(Observation Model)')
|
|
plt.colorbar(im1, ax=ax1)
|
|
ax1.set_xlabel('States')
|
|
ax1.set_ylabel('Observations')
|
|
|
|
# B matrix (first action)
|
|
ax2 = fig.add_subplot(gs[0, 1:])
|
|
im2 = ax2.imshow(pomdp.B[:,:,0], cmap='viridis', aspect='auto')
|
|
ax2.set_title('B Matrix\n(Transition Model, Action 0)')
|
|
plt.colorbar(im2, ax=ax2)
|
|
ax2.set_xlabel('States t')
|
|
ax2.set_ylabel('States t+1')
|
|
|
|
# C matrix
|
|
ax3 = fig.add_subplot(gs[1, :])
|
|
vmax = max(abs(pomdp.C.min()), abs(pomdp.C.max()))
|
|
vmin = -vmax
|
|
im3 = ax3.imshow(pomdp.C, cmap='RdBu_r', aspect='auto', vmin=vmin, vmax=vmax)
|
|
ax3.set_title('C Matrix (Preferences)')
|
|
plt.colorbar(im3, ax=ax3)
|
|
ax3.set_xlabel('Time Steps')
|
|
ax3.set_ylabel('Observations')
|
|
|
|
# D matrix
|
|
ax4 = fig.add_subplot(gs[2, 0:2])
|
|
ax4.bar(range(len(pomdp.D)), pomdp.D)
|
|
ax4.set_title('D Matrix (Prior Beliefs)')
|
|
ax4.grid(True, alpha=0.3)
|
|
ax4.set_xlabel('States')
|
|
ax4.set_ylabel('Probability')
|
|
|
|
# E matrix
|
|
ax5 = fig.add_subplot(gs[2, 2])
|
|
ax5.bar(range(len(pomdp.E)), pomdp.E)
|
|
ax5.set_title('E Matrix (Policy Prior)')
|
|
ax5.grid(True, alpha=0.3)
|
|
ax5.set_xlabel('Actions')
|
|
ax5.set_ylabel('Probability')
|
|
|
|
plt.tight_layout()
|
|
plt.savefig(plots_dir / 'matrix_overview.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
def test_full_simulation(configured_pomdp, config, output_dir):
|
|
"""Run a full simulation with comprehensive logging."""
|
|
try:
|
|
logger.info("\nStarting full simulation test")
|
|
|
|
# Setup output paths
|
|
test_dir = output_dir["test_results"] / "full_simulation"
|
|
test_dir.mkdir(exist_ok=True)
|
|
plots_dir = output_dir["plots"]
|
|
|
|
# Create EFE components directory
|
|
efe_dir = plots_dir / "efe_components"
|
|
efe_dir.mkdir(exist_ok=True)
|
|
|
|
# Log paths
|
|
logger.info(f"\nOutput directories:")
|
|
logger.info(f"Test results: {test_dir}")
|
|
logger.info(f"Plots: {plots_dir}")
|
|
logger.info(f"EFE Components: {efe_dir}")
|
|
|
|
# Log initial configuration
|
|
logger.info("\nModel Configuration:")
|
|
logger.info(f"Observations: {configured_pomdp.num_observations}")
|
|
logger.info(f"States: {configured_pomdp.num_states}")
|
|
logger.info(f"Actions: {configured_pomdp.num_actions}")
|
|
logger.info(f"Planning horizon: {config['dimensions']['planning_horizon']}")
|
|
logger.info(f"Total timesteps: {config['dimensions']['total_timesteps']}")
|
|
|
|
# Create matrix visualizations before simulation
|
|
create_matrix_visualizations(configured_pomdp, plots_dir)
|
|
|
|
# Run simulation
|
|
n_steps = config['dimensions']['total_timesteps']
|
|
planning_horizon = config['dimensions']['planning_horizon']
|
|
logger.info(f"\nRunning simulation for {n_steps} steps")
|
|
|
|
history = {
|
|
'observations': [],
|
|
'actions': [],
|
|
'free_energies': [],
|
|
'beliefs': [],
|
|
'efe_components': [] # Store EFE components for each policy at each timestep
|
|
}
|
|
|
|
# Store initial beliefs
|
|
history['beliefs'].append(configured_pomdp.state.beliefs.copy())
|
|
|
|
# Run simulation with progress tracking
|
|
for step in range(n_steps):
|
|
logger.info(f"\nStep {step + 1}/{n_steps}")
|
|
|
|
# Get EFE components before taking step
|
|
efe_components = configured_pomdp.get_efe_components() # This method needs to be implemented in GenericPOMDP
|
|
history['efe_components'].append(efe_components)
|
|
|
|
# Take step
|
|
obs, fe = configured_pomdp.step()
|
|
|
|
# Log step results
|
|
logger.info(f"Observation: {obs}")
|
|
logger.info(f"Free Energy: {fe:.4f}")
|
|
logger.info(f"Updated beliefs: {configured_pomdp.state.beliefs}")
|
|
|
|
# Store history
|
|
history['observations'].append(obs)
|
|
history['actions'].append(configured_pomdp.state.history['actions'][-1])
|
|
history['free_energies'].append(fe)
|
|
history['beliefs'].append(configured_pomdp.state.beliefs.copy())
|
|
|
|
# Create EFE component visualization for this timestep
|
|
visualize_efe_components(efe_components, step, efe_dir, planning_horizon)
|
|
|
|
# Generate and save visualizations
|
|
# Belief evolution plot
|
|
plt.figure(figsize=(10, 6))
|
|
beliefs = np.array(history['beliefs'])
|
|
for i in range(configured_pomdp.num_states):
|
|
plt.plot(range(len(beliefs)), beliefs[:, i],
|
|
label=f'State {i}', linewidth=2, alpha=0.7)
|
|
plt.xlabel('Time Step')
|
|
plt.ylabel('Belief Probability')
|
|
plt.title('Belief Evolution Over Time')
|
|
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
|
|
plt.grid(True, alpha=0.3)
|
|
plt.tight_layout()
|
|
plt.savefig(plots_dir / 'belief_evolution.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
# Free energy plot
|
|
plt.figure(figsize=(10, 6))
|
|
plt.plot(range(len(history['free_energies'])), history['free_energies'],
|
|
'b-', linewidth=2, alpha=0.7)
|
|
plt.xlabel('Time Step')
|
|
plt.ylabel('Free Energy')
|
|
plt.title('Free Energy Over Time')
|
|
plt.grid(True, alpha=0.3)
|
|
plt.tight_layout()
|
|
plt.savefig(plots_dir / 'free_energy.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
# Action distribution plot
|
|
plt.figure(figsize=(10, 6))
|
|
actions = np.array(history['actions'])
|
|
action_counts = np.bincount(actions, minlength=configured_pomdp.num_actions)
|
|
bars = plt.bar(range(configured_pomdp.num_actions), action_counts)
|
|
plt.xlabel('Action')
|
|
plt.ylabel('Count')
|
|
plt.title('Action Distribution')
|
|
for bar, count in zip(bars, action_counts):
|
|
plt.text(bar.get_x() + bar.get_width()/2, count, str(count),
|
|
ha='center', va='bottom')
|
|
plt.grid(True, alpha=0.3)
|
|
plt.tight_layout()
|
|
plt.savefig(plots_dir / 'action_distribution.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
# Log summary statistics
|
|
logger.info("\nSimulation Summary:")
|
|
logger.info(f"Average Free Energy: {np.mean(history['free_energies']):.4f}")
|
|
logger.info(f"Observation distribution: {np.bincount(history['observations'])}")
|
|
logger.info(f"Action distribution: {np.bincount(history['actions'])}")
|
|
|
|
# Calculate belief entropy evolution
|
|
belief_entropies = [-np.sum(b * np.log(b + 1e-12)) for b in history['beliefs']]
|
|
logger.info(f"Initial belief entropy: {belief_entropies[0]:.4f}")
|
|
logger.info(f"Final belief entropy: {belief_entropies[-1]:.4f}")
|
|
|
|
# Log generated files
|
|
logger.info("\nGenerated visualization files:")
|
|
for plot_file in plots_dir.glob("*.png"):
|
|
logger.info(f"- {plot_file.name}")
|
|
|
|
# Additional assertions for matrix properties
|
|
# Test A matrix properties
|
|
assert np.allclose(configured_pomdp.A.sum(axis=0), 1.0), "A matrix columns should sum to 1"
|
|
assert np.all(configured_pomdp.A >= 0), "A matrix should be non-negative"
|
|
|
|
# Test B matrix properties
|
|
for a in range(configured_pomdp.num_actions):
|
|
assert np.allclose(configured_pomdp.B[:,:,a].sum(axis=0), 1.0), \
|
|
f"B matrix for action {a} columns should sum to 1"
|
|
assert np.all(configured_pomdp.B[:,:,a] >= 0), \
|
|
f"B matrix for action {a} should be non-negative"
|
|
|
|
# Test D matrix properties
|
|
assert np.allclose(configured_pomdp.D.sum(), 1.0), "D matrix should sum to 1"
|
|
assert np.all(configured_pomdp.D >= 0), "D matrix should be non-negative"
|
|
|
|
# Test E matrix properties
|
|
assert np.allclose(configured_pomdp.E.sum(), 1.0), "E matrix should sum to 1"
|
|
assert np.all(configured_pomdp.E >= 0), "E matrix should be non-negative"
|
|
|
|
# Check that all visualization files exist
|
|
expected_files = [
|
|
'belief_evolution.png',
|
|
'free_energy.png',
|
|
'action_distribution.png',
|
|
'A_matrix.png',
|
|
'B_matrix.png',
|
|
'C_matrix.png',
|
|
'D_matrix.png',
|
|
'E_matrix.png',
|
|
'matrix_overview.png'
|
|
]
|
|
for filename in expected_files:
|
|
assert (plots_dir / filename).exists(), f"Expected visualization file {filename} not found"
|
|
|
|
logger.info("\nFull simulation test completed successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"\nError in full simulation test: {str(e)}")
|
|
raise
|
|
|
|
def visualize_efe_components(efe_components, timestep, efe_dir, planning_horizon):
|
|
"""Create visualizations for EFE components at each timestep."""
|
|
# Extract components
|
|
ambiguity = efe_components['ambiguity']
|
|
risk = efe_components['risk']
|
|
expected_preferences = efe_components['expected_preferences']
|
|
total_efe = efe_components['total_efe']
|
|
|
|
# Create figure with subplots
|
|
fig = plt.figure(figsize=(15, 10))
|
|
gs = GridSpec(2, 2, figure=fig)
|
|
|
|
# Plot ambiguity
|
|
ax1 = fig.add_subplot(gs[0, 0])
|
|
im1 = ax1.imshow(ambiguity.reshape(-1, planning_horizon), cmap='viridis', aspect='auto')
|
|
ax1.set_title('Ambiguity')
|
|
ax1.set_xlabel('Time Step')
|
|
ax1.set_ylabel('Policy')
|
|
plt.colorbar(im1, ax=ax1)
|
|
|
|
# Plot risk
|
|
ax2 = fig.add_subplot(gs[0, 1])
|
|
im2 = ax2.imshow(risk.reshape(-1, planning_horizon), cmap='viridis', aspect='auto')
|
|
ax2.set_title('Risk')
|
|
ax2.set_xlabel('Time Step')
|
|
ax2.set_ylabel('Policy')
|
|
plt.colorbar(im2, ax=ax2)
|
|
|
|
# Plot expected preferences with centered colormap
|
|
ax3 = fig.add_subplot(gs[1, 0])
|
|
vmax = np.max(np.abs(expected_preferences))
|
|
vmin = -vmax
|
|
im3 = ax3.imshow(expected_preferences.reshape(-1, planning_horizon),
|
|
cmap='RdBu_r', aspect='auto', vmin=vmin, vmax=vmax)
|
|
ax3.set_title('Expected Preferences')
|
|
ax3.set_xlabel('Time Step')
|
|
ax3.set_ylabel('Policy')
|
|
plt.colorbar(im3, ax=ax3)
|
|
|
|
# Plot total EFE as bar chart
|
|
ax4 = fig.add_subplot(gs[1, 1])
|
|
policies = range(len(total_efe))
|
|
ax4.bar(policies, total_efe)
|
|
ax4.set_title('Total Expected Free Energy')
|
|
ax4.set_xlabel('Policy')
|
|
ax4.set_ylabel('EFE')
|
|
|
|
# Adjust layout and save
|
|
plt.tight_layout()
|
|
plt.savefig(efe_dir / f'efe_components_t{timestep:03d}.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
# Create policy comparison visualization
|
|
fig, ax = plt.subplots(figsize=(10, 6))
|
|
x = np.arange(len(total_efe))
|
|
width = 0.25
|
|
|
|
# Plot grouped bar chart
|
|
ax.bar(x - width, ambiguity.mean(axis=1), width, label='Ambiguity', color='blue', alpha=0.7)
|
|
ax.bar(x, risk.mean(axis=1), width, label='Risk', color='red', alpha=0.7)
|
|
ax.bar(x + width, expected_preferences.mean(axis=1), width, label='Expected Preferences', color='green', alpha=0.7)
|
|
|
|
ax.set_xlabel('Policy')
|
|
ax.set_ylabel('Component Value')
|
|
ax.set_title('EFE Components by Policy')
|
|
ax.legend()
|
|
|
|
plt.tight_layout()
|
|
plt.savefig(efe_dir / f'policy_comparison_t{timestep:03d}.png', dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
def test_full_configured_simulation(configured_pomdp, config):
|
|
"""Run a full simulation with the configured time horizon."""
|
|
logger.info("\nStarting full configured simulation test")
|
|
|
|
# Get total timesteps from config
|
|
T = config['dimensions']['total_timesteps']
|
|
logger.info(f"\nRunning simulation for {T} timesteps as configured")
|
|
|
|
# Log initial configuration
|
|
logger.info("\nModel Configuration:")
|
|
logger.info(f"Observations: {configured_pomdp.num_observations}")
|
|
logger.info(f"States: {configured_pomdp.num_states}")
|
|
logger.info(f"Actions: {configured_pomdp.num_actions}")
|
|
logger.info(f"Planning horizon: {config['dimensions']['planning_horizon']}")
|
|
logger.info(f"Total timesteps: {T}")
|
|
|
|
# Initialize history tracking
|
|
history = {
|
|
'observations': [],
|
|
'actions': [],
|
|
'free_energies': [],
|
|
'beliefs': [],
|
|
'efe_components': [] # Track EFE components instead of preferences
|
|
}
|
|
|
|
# Store initial state
|
|
history['beliefs'].append(configured_pomdp.state.beliefs.copy())
|
|
|
|
# Run simulation for T timesteps
|
|
for step in range(T):
|
|
logger.info(f"\nTimestep {step + 1}/{T}")
|
|
|
|
# Get EFE components before taking step
|
|
efe_components = configured_pomdp.get_efe_components()
|
|
history['efe_components'].append(efe_components)
|
|
|
|
# Take step and record results
|
|
obs, fe = configured_pomdp.step()
|
|
|
|
# Log step results
|
|
logger.info(f"Observation: {obs}")
|
|
logger.info(f"Free Energy: {fe:.4f}")
|
|
logger.info(f"Updated beliefs: {configured_pomdp.state.beliefs}")
|
|
|
|
# Store history
|
|
history['observations'].append(obs)
|
|
history['actions'].append(configured_pomdp.state.history['actions'][-1])
|
|
history['free_energies'].append(fe)
|
|
history['beliefs'].append(configured_pomdp.state.beliefs.copy())
|
|
|
|
# Calculate and log summary statistics
|
|
logger.info("\nSimulation Summary:")
|
|
logger.info(f"Average Free Energy: {np.mean(history['free_energies']):.4f}")
|
|
|
|
# Observation statistics
|
|
obs_counts = np.bincount(history['observations'], minlength=configured_pomdp.num_observations)
|
|
logger.info(f"Observation distribution: {obs_counts}")
|
|
logger.info(f"Most frequent observation: {np.argmax(obs_counts)} (count: {np.max(obs_counts)})")
|
|
|
|
# Action statistics
|
|
action_counts = np.bincount(history['actions'], minlength=configured_pomdp.num_actions)
|
|
logger.info(f"Action distribution: {action_counts}")
|
|
logger.info(f"Most frequent action: {np.argmax(action_counts)} (count: {np.max(action_counts)})")
|
|
|
|
# Belief evolution
|
|
belief_entropies = [-np.sum(b * np.log(b + 1e-12)) for b in history['beliefs']]
|
|
logger.info(f"Initial belief entropy: {belief_entropies[0]:.4f}")
|
|
logger.info(f"Final belief entropy: {belief_entropies[-1]:.4f}")
|
|
logger.info(f"Belief entropy reduction: {belief_entropies[0] - belief_entropies[-1]:.4f}")
|
|
|
|
# EFE component analysis
|
|
avg_ambiguity = np.mean([comp['ambiguity'].mean() for comp in history['efe_components']])
|
|
avg_risk = np.mean([comp['risk'].mean() for comp in history['efe_components']])
|
|
avg_preferences = np.mean([comp['expected_preferences'].mean() for comp in history['efe_components']])
|
|
logger.info("\nEFE Component Analysis:")
|
|
logger.info(f"Average Ambiguity: {avg_ambiguity:.4f}")
|
|
logger.info(f"Average Risk: {avg_risk:.4f}")
|
|
logger.info(f"Average Expected Preferences: {avg_preferences:.4f}")
|
|
|
|
# Validate results
|
|
assert len(history['observations']) == T, f"Expected {T} observations"
|
|
assert len(history['actions']) == T, f"Expected {T} actions"
|
|
assert len(history['free_energies']) == T, f"Expected {T} free energy values"
|
|
assert all(np.isfinite(fe) for fe in history['free_energies']), "All free energies should be finite"
|
|
assert all(0 <= obs < configured_pomdp.num_observations
|
|
for obs in history['observations']), "All observations should be valid"
|
|
|
|
logger.info("\nFull configured simulation completed successfully")
|
|
|
|
def test_matrix_numerical_properties(configured_pomdp):
|
|
"""Test numerical properties of matrices."""
|
|
# Test condition numbers
|
|
A_cond = np.linalg.cond(configured_pomdp.A)
|
|
assert A_cond < 1e6, f"A matrix condition number too high: {A_cond}"
|
|
|
|
for a in range(configured_pomdp.num_actions):
|
|
B_cond = np.linalg.cond(configured_pomdp.B[:,:,a])
|
|
assert B_cond < 1e6, f"B matrix condition number too high for action {a}: {B_cond}"
|
|
|
|
# Test for NaN and Inf values
|
|
assert not np.any(np.isnan(configured_pomdp.A)), "A matrix contains NaN values"
|
|
assert not np.any(np.isinf(configured_pomdp.A)), "A matrix contains Inf values"
|
|
|
|
assert not np.any(np.isnan(configured_pomdp.B)), "B matrix contains NaN values"
|
|
assert not np.any(np.isinf(configured_pomdp.B)), "B matrix contains Inf values"
|
|
|
|
assert not np.any(np.isnan(configured_pomdp.C)), "C matrix contains NaN values"
|
|
assert not np.any(np.isinf(configured_pomdp.C)), "C matrix contains Inf values"
|
|
|
|
assert not np.any(np.isnan(configured_pomdp.D)), "D matrix contains NaN values"
|
|
assert not np.any(np.isinf(configured_pomdp.D)), "D matrix contains Inf values"
|
|
|
|
assert not np.any(np.isnan(configured_pomdp.E)), "E matrix contains NaN values"
|
|
assert not np.any(np.isinf(configured_pomdp.E)), "E matrix contains Inf values"
|
|
|
|
def test_matrix_consistency(configured_pomdp):
|
|
"""Test consistency of matrix dimensions and properties."""
|
|
# Test dimension consistency
|
|
assert configured_pomdp.A.shape == (configured_pomdp.num_observations, configured_pomdp.num_states), \
|
|
"A matrix dimensions inconsistent"
|
|
|
|
assert configured_pomdp.B.shape == (configured_pomdp.num_states, configured_pomdp.num_states,
|
|
configured_pomdp.num_actions), "B matrix dimensions inconsistent"
|
|
|
|
assert configured_pomdp.C.shape[0] == configured_pomdp.num_observations, \
|
|
"C matrix observation dimension inconsistent"
|
|
|
|
assert configured_pomdp.D.shape == (configured_pomdp.num_states,), \
|
|
"D matrix dimension inconsistent"
|
|
|
|
assert configured_pomdp.E.shape == (configured_pomdp.num_actions,), \
|
|
"E matrix dimension inconsistent"
|
|
|
|
# Test probability constraints
|
|
for a in range(configured_pomdp.num_actions):
|
|
# Each column in B should represent a valid transition probability distribution
|
|
assert np.all(np.abs(configured_pomdp.B[:,:,a].sum(axis=0) - 1.0) < 1e-10), \
|
|
f"B matrix for action {a} contains invalid probability distributions"
|
|
|
|
# Each column in A should represent a valid observation probability distribution
|
|
assert np.all(np.abs(configured_pomdp.A.sum(axis=0) - 1.0) < 1e-10), \
|
|
"A matrix contains invalid probability distributions"
|
|
|
|
def test_belief_update_stability(configured_pomdp):
|
|
"""Test numerical stability of belief updates under extreme conditions."""
|
|
# Test with very small beliefs
|
|
configured_pomdp.state.beliefs = np.ones(configured_pomdp.num_states) * 1e-10
|
|
configured_pomdp.state.beliefs /= configured_pomdp.state.beliefs.sum()
|
|
|
|
# Should not raise and give valid beliefs
|
|
obs, fe = configured_pomdp.step()
|
|
assert np.allclose(configured_pomdp.state.beliefs.sum(), 1.0), "Beliefs not normalized after update"
|
|
assert np.all(configured_pomdp.state.beliefs >= 0), "Negative beliefs after update"
|
|
assert np.all(np.isfinite(configured_pomdp.state.beliefs)), "Non-finite beliefs after update"
|
|
|
|
# Test with very concentrated beliefs
|
|
configured_pomdp.state.beliefs = np.zeros(configured_pomdp.num_states)
|
|
configured_pomdp.state.beliefs[0] = 1.0 - 1e-10
|
|
configured_pomdp.state.beliefs[1:] = 1e-10 / (configured_pomdp.num_states - 1)
|
|
|
|
# Should not raise and give valid beliefs
|
|
obs, fe = configured_pomdp.step()
|
|
assert np.allclose(configured_pomdp.state.beliefs.sum(), 1.0), "Beliefs not normalized after update"
|
|
assert np.all(configured_pomdp.state.beliefs >= 0), "Negative beliefs after update"
|
|
assert np.all(np.isfinite(configured_pomdp.state.beliefs)), "Non-finite beliefs after update"
|
|
|
|
def test_preference_learning(configured_pomdp):
|
|
"""Test learning with different preference configurations."""
|
|
# Store original matrices
|
|
original_C = configured_pomdp.C.copy()
|
|
original_B = configured_pomdp.B.copy()
|
|
original_A = configured_pomdp.A.copy()
|
|
|
|
try:
|
|
# Set very strong preference for first observation
|
|
configured_pomdp.C[0,:] = 5.0 # Strong positive preference for observation 0
|
|
configured_pomdp.C[1:,:] = -2.0 # Negative preference for other observations
|
|
|
|
# Modify transition dynamics to make observation 0 achievable
|
|
# Make first state more likely to generate observation 0
|
|
configured_pomdp.A[0,0] = 0.8 # High probability of observation 0 from state 0
|
|
configured_pomdp.A[1:,0] = 0.2 / (configured_pomdp.num_observations - 1) # Distribute remaining probability
|
|
|
|
# Normalize A matrix columns
|
|
configured_pomdp.A /= configured_pomdp.A.sum(axis=0, keepdims=True)
|
|
|
|
# Make action 0 lead to state 0 with high probability
|
|
for a in range(configured_pomdp.num_actions):
|
|
configured_pomdp.B[:,:,a] = np.eye(configured_pomdp.num_states) * 0.7
|
|
configured_pomdp.B[0,:,0] = 0.8 # Action 0 leads to state 0
|
|
configured_pomdp.B[1:,:,0] = 0.2 / (configured_pomdp.num_states - 1) # Distribute remaining probability
|
|
# Normalize B matrix columns
|
|
configured_pomdp.B[:,:,a] /= configured_pomdp.B[:,:,a].sum(axis=0, keepdims=True)
|
|
|
|
# Reset beliefs to uniform
|
|
configured_pomdp.state.beliefs = np.ones(configured_pomdp.num_states) / configured_pomdp.num_states
|
|
|
|
# Run simulation
|
|
n_steps = 50 # Increased number of steps
|
|
observations = []
|
|
for _ in range(n_steps):
|
|
obs, _ = configured_pomdp.step()
|
|
observations.append(obs)
|
|
|
|
# Check if preferred observations occur more frequently
|
|
obs_counts = np.bincount(observations, minlength=configured_pomdp.num_observations)
|
|
logger.info(f"Observation counts: {obs_counts}")
|
|
logger.info(f"Observation 0 count: {obs_counts[0]}")
|
|
logger.info(f"Mean of other observations: {np.mean(obs_counts[1:]):.2f}")
|
|
|
|
# Require at least 30% more observations of type 0 than average of others
|
|
assert obs_counts[0] > 1.3 * np.mean(obs_counts[1:]), \
|
|
f"Preferred observation not selected more frequently. Got {obs_counts[0]} vs mean {np.mean(obs_counts[1:]):.2f}"
|
|
|
|
finally:
|
|
# Restore original matrices
|
|
configured_pomdp.C = original_C
|
|
configured_pomdp.B = original_B
|
|
configured_pomdp.A = original_A
|
|
# Reset beliefs to uniform
|
|
configured_pomdp.state.beliefs = np.ones(configured_pomdp.num_states) / configured_pomdp.num_states
|
|
|
|
def test_policy_tree_generation(small_pomdp):
|
|
"""Test policy tree generation and pruning."""
|
|
# Get policy tree
|
|
components = small_pomdp.get_efe_components()
|
|
policies = components['policies']
|
|
|
|
# Check basic properties
|
|
assert isinstance(policies, list)
|
|
assert all(isinstance(p, list) for p in policies)
|
|
assert all(all(isinstance(a, int) for a in p) for p in policies)
|
|
assert all(0 <= a < small_pomdp.num_actions for p in policies for a in p)
|
|
|
|
# Check policy lengths
|
|
assert all(len(p) <= small_pomdp.planning_horizon for p in policies)
|
|
|
|
# Check number of policies is bounded
|
|
assert len(policies) <= small_pomdp.max_policies
|
|
|
|
# Check policy evaluation components
|
|
assert components['ambiguity'].shape == (len(policies), small_pomdp.planning_horizon)
|
|
assert components['risk'].shape == (len(policies), small_pomdp.planning_horizon)
|
|
assert components['expected_preferences'].shape == (len(policies), small_pomdp.planning_horizon)
|
|
assert components['total_efe'].shape == (len(policies),)
|
|
|
|
def test_temporal_preference_evaluation(small_pomdp):
|
|
"""Test evaluation of preferences over time."""
|
|
# Set time-varying preferences
|
|
small_pomdp.C = np.zeros((small_pomdp.num_observations, small_pomdp.planning_horizon))
|
|
small_pomdp.C[0, 0] = 1.0 # Prefer first observation initially
|
|
small_pomdp.C[1, -1] = 2.0 # Strongly prefer second observation later
|
|
|
|
# Get policy evaluation
|
|
components = small_pomdp.get_efe_components()
|
|
expected_preferences = components['expected_preferences']
|
|
|
|
# Policies should show preference for delayed reward
|
|
best_policy_idx = np.argmin(components['total_efe'])
|
|
best_policy = components['policies'][best_policy_idx]
|
|
|
|
# Check that later actions in best policy lead to preferred observation
|
|
final_beliefs = small_pomdp.state.beliefs.copy()
|
|
for action in best_policy:
|
|
final_beliefs = small_pomdp.B[:,:,action] @ final_beliefs
|
|
final_obs_probs = small_pomdp.A @ final_beliefs
|
|
|
|
# Should have high probability for second observation
|
|
assert final_obs_probs[1] > 0.3
|
|
|
|
def test_policy_pruning(medium_pomdp):
|
|
"""Test that policy pruning maintains diversity and performance."""
|
|
# Get full policy evaluation
|
|
components = medium_pomdp.get_efe_components()
|
|
policies = components['policies']
|
|
total_efe = components['total_efe']
|
|
|
|
# Check pruning keeps best policies
|
|
best_idx = np.argmin(total_efe)
|
|
best_efe = total_efe[best_idx]
|
|
|
|
# Run with smaller max_policies
|
|
medium_pomdp.max_policies = len(policies) // 2
|
|
pruned_components = medium_pomdp.get_efe_components()
|
|
pruned_total_efe = pruned_components['total_efe']
|
|
|
|
# Best pruned policy should be close to original best
|
|
best_pruned_efe = np.min(pruned_total_efe)
|
|
assert np.abs(best_pruned_efe - best_efe) < 1.0
|
|
|
|
# Check policy diversity (not all same first action)
|
|
first_actions = [p[0] for p in pruned_components['policies']]
|
|
assert len(set(first_actions)) > 1
|
|
|
|
def test_belief_propagation(small_pomdp):
|
|
"""Test belief propagation through policy evaluation."""
|
|
# Set specific initial beliefs
|
|
small_pomdp.state.beliefs = np.zeros(small_pomdp.num_states)
|
|
small_pomdp.state.beliefs[0] = 1.0
|
|
|
|
# Get policy evaluation
|
|
components = small_pomdp.get_efe_components()
|
|
policies = components['policies']
|
|
|
|
# Check belief propagation for first policy
|
|
test_policy = policies[0]
|
|
beliefs = small_pomdp.state.beliefs.copy()
|
|
|
|
for t, action in enumerate(test_policy):
|
|
# Manual belief propagation
|
|
beliefs = small_pomdp.B[:,:,action] @ beliefs
|
|
|
|
# Get predicted observations
|
|
pred_obs = small_pomdp.A @ beliefs
|
|
|
|
# Compare with stored components
|
|
policy_idx = 0 # Testing first policy
|
|
|
|
# Check ambiguity matches entropy of beliefs
|
|
expected_ambiguity = -np.sum(beliefs * np.log(beliefs + small_pomdp.stability_threshold))
|
|
assert np.abs(components['ambiguity'][policy_idx, t] - expected_ambiguity) < 1e-5
|
|
|
|
# Check risk computation
|
|
expected_risk = small_pomdp._compute_kl_divergence(
|
|
pred_obs,
|
|
small_pomdp._softmax(small_pomdp.C[:, t])
|
|
)
|
|
assert np.abs(components['risk'][policy_idx, t] - expected_risk) < 1e-5
|
|
|
|
if __name__ == '__main__':
|
|
# When run directly, execute pytest with full output
|
|
import sys
|
|
sys.exit(pytest.main([__file__, "-v", "--capture=tee-sys"])) |