Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions check_subgame_perfection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

import numpy as np
import pyspiel
from open_spiel.python.algorithms import sequence_form_lp
from open_spiel.python import policy

def check_subgame_perfection():
# Load Kuhn Poker
game = pyspiel.load_game("kuhn_poker")

# Solve the game
val1, val2, pol0, pol1 = sequence_form_lp.solve_zero_sum_game(game)

print(f"Game value: {val1}")

# For debugging, let's see what the LPs look like or their solutions
# We need access to the lps and solutions, so we might need to modify solve_zero_sum_game temporarily to return them or use internal access.
# Since I just modified sequence_form_lp.py to print debug info, I'll just run it.

# Check for an unreachable state in Kuhn Poker
# In Kuhn Poker, if P0 checks and P1 bets, P0 folding is optimal for 'J'.
# But what if some state is unreachable because P0 themselves made a move?
# Actually, Kuhn Poker is small, let's just inspect some infostates.

# Let's find infostates that have 0 reach probability for the player.
# We'll use a custom game if needed, but Kuhn should suffice.

# Check Player 0's policy
print("\nPlayer 0 Policy:")
for key in pol0.state_lookup.keys():
p = pol0.policy_for_key(key)
print(f"{key}: {p}")

# In Kuhn, P0 has 3 infostates dealing (J), (Q), (K) at the start.
# Initial moves are Check or Bet.
# Equilibrium for P0 (approximately):
# J: Check 100% (usually)
# Q: Check 100%
# K: Bet 1/3, Check 2/3 (or similar)

# If P0 bets with 'J', it's a bluff.
# But if the equilibrium says "never bet with Q", then P0 betting with Q is unreachable for P0.
# Let's see what P1 does in that case.

# Check Player 1's policy
print("\nPlayer 1 Policy:")
for key in pol1.state_lookup.keys():
p = pol1.policy_for_key(key)
print(f"{key}: {p}")

# For subgame perfection, even if an infostate is unreachable,
# the choice should be optimal against the opponent's strategy.

if __name__ == "__main__":
check_subgame_perfection()
26 changes: 22 additions & 4 deletions open_spiel/algorithms/ortools/sequence_form_lp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,32 @@ TabularPolicy SequenceFormLpSpecification::OptimalPolicy(Player for_player) {
}
for (int i = 0; i < actions.size(); ++i) {
double prob;
if (rp_sum) {
if (rp_sum > 1e-10) {
prob = node_spec_[node->child_at(i)].var_reach_prob->solution_value() /
rp_sum;
} else {
// If the infostate is unreachable, the strategy is not defined.
// However some code in the library may require having the strategy,
// so we just put an uniform strategy here.
prob = 1. / actions.size();
// We use reduced costs to identify optimal actions for subgame perfection.
std::vector<int> optimal_actions;
for (int j = 0; j < actions.size(); ++j) {
double rc = node_spec_[node->child_at(j)].var_reach_prob->reduced_cost();
if (std::abs(rc) < 1e-8) {
optimal_actions.push_back(j);
}
}
if (optimal_actions.empty()) {
// Fallback to uniform if no optimal actions found via reduced costs.
prob = 1. / actions.size();
} else {
// Uniform over optimal actions.
prob = 0.0;
for (int opt_idx : optimal_actions) {
if (opt_idx == i) {
prob = 1. / optimal_actions.size();
break;
}
}
}
}
state_policy.push_back({actions[i], prob});
}
Expand Down
31 changes: 30 additions & 1 deletion open_spiel/python/algorithms/lp_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,36 @@ def solve(self, solver=None):
sol = cvxopt.solvers.lp(c, g_mat, h, a_mat, b, solver=solver)
else:
sol = cvxopt.solvers.lp(c, g_mat, h, solver=solver)
return sol["x"]
return sol["x"], sol["y"], sol["z"]

def get_slack(self, cons_label, solution):
"""Returns the slack of a constraint given a solution.

Args:
cons_label: the label of the constraint
solution: the solution vector (output of solve)

Returns:
The slack of the constraint. For LEQ (Gx <= h), slack = h - Gx.
For GEQ (Gx >= h), slack = Gx - h.
For EQ (Ax = b), slack = |Ax - b|.
"""
cons = self._cons.get(cons_label)
assert cons is not None
lhs = 0
for var_label, coeff in cons.coeffs.items():
vid = self._vars[var_label].vid
lhs += coeff * solution[vid]

rhs = cons.rhs if cons.rhs is not None else 0.0
if cons.ctype == CONS_TYPE_LEQ:
return rhs - lhs
elif cons.ctype == CONS_TYPE_GEQ:
return lhs - rhs
elif cons.ctype == CONS_TYPE_EQ:
return abs(lhs - rhs)
else:
assert False, "Unknown constraint type"


def solve_zero_sum_matrix_game(game):
Expand Down
132 changes: 111 additions & 21 deletions open_spiel/python/algorithms/sequence_form_lp.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@


def _construct_lps(state, infosets, infoset_actions, infoset_action_maps,
chance_reach, lps, parent_is_keys, parent_isa_keys):
chance_reach, lps, parent_is_keys, parent_isa_keys,
infostate_parent_sequences):
"""Build the linear programs recursively from this state.

Args:
Expand All @@ -58,6 +59,8 @@ def _construct_lps(state, infosets, infoset_actions, infoset_action_maps,
constraints and variables.
parent_is_keys: a list of parent information state keys for this state
parent_isa_keys: a list of parent (infostate, action) keys
infostate_parent_sequences: a list of dicts, one per player, that maps
infostate to the parent sequence key of the opponent.
"""
if state.is_terminal():
returns = state.returns()
Expand All @@ -79,7 +82,8 @@ def _construct_lps(state, infosets, infoset_actions, infoset_action_maps,
for action, prob in state.chance_outcomes():
new_state = state.child(action)
_construct_lps(new_state, infosets, infoset_actions, infoset_action_maps,
prob * chance_reach, lps, parent_is_keys, parent_isa_keys)
prob * chance_reach, lps, parent_is_keys, parent_isa_keys,
infostate_parent_sequences)
return

player = state.current_player()
Expand Down Expand Up @@ -111,6 +115,7 @@ def _construct_lps(state, infosets, infoset_actions, infoset_action_maps,
# Add to the infostate maps
if info_state not in infosets[player]:
infosets[player][info_state] = len(infosets[player])
infostate_parent_sequences[player][info_state] = parent_isa_keys[1 - player]
if info_state not in infoset_action_maps[player]:
infoset_action_maps[player][info_state] = []

Expand All @@ -137,7 +142,8 @@ def _construct_lps(state, infosets, infoset_actions, infoset_action_maps,

new_state = state.child(action)
_construct_lps(new_state, infosets, infoset_actions, infoset_action_maps,
chance_reach, lps, new_parent_is_keys, new_parent_isa_keys)
chance_reach, lps, new_parent_is_keys, new_parent_isa_keys,
infostate_parent_sequences)


def solve_zero_sum_game(game, solver=None):
Expand Down Expand Up @@ -228,30 +234,114 @@ def solve_zero_sum_game(game, solver=None):
lps[1].set_cons_coeff(_EMPTY_INFOSET_KEYS[0], _EMPTY_INFOSET_ACTION_KEYS[0],
1.0)
lps[1].set_cons_rhs(_EMPTY_INFOSET_KEYS[0], 1.0)
# Mapping from infostate to its parent sequence id (opponent's sequence)
# This tells us which opponent sequence leads to this infostate.
infostate_parent_sequences = [{}, {}]

_construct_lps(game.new_initial_state(), infosets, infoset_actions,
infoset_action_maps, 1.0, lps, _EMPTY_INFOSET_KEYS[:],
_EMPTY_INFOSET_ACTION_KEYS[:])
_EMPTY_INFOSET_ACTION_KEYS[:], infostate_parent_sequences)
# Solve the programs.
solutions = [lps[0].solve(solver=solver), lps[1].solve(solver=solver)]
primal_solutions = []
dual_eq_solutions = []
dual_ineq_solutions = []
for i in range(2):
x, y, z = lps[i].solve(solver=solver)
primal_solutions.append(x)
dual_eq_solutions.append(y)
dual_ineq_solutions.append(z)

# Extract the policies (convert from realization plan to behavioral form).
policies = [policy.TabularPolicy(game), policy.TabularPolicy(game)]

# To correctly identify reachable states for player i, we need to know the reach
# probability of infostates under the current equilibrium strategies.
# An infostate is reachable for player i if ALL its player-i ancestors have
# non-zero probability and ALL its player-(1-i) ancestors (sequences) have
# non-zero realization probability.

reach_probs = [{}, {}] # Key: infostate, Value: reach probability
reach_probs[0][_EMPTY_INFOSET_KEYS[0]] = 1.0
reach_probs[1][_EMPTY_INFOSET_KEYS[1]] = 1.0

# We need to traverse the game tree (or use the infoset maps) to propagate reach probs.
# Since we have infoset_action_maps, we can propagate top-down.
# However, the order in infoset_action_maps might not be topological.
# Let's use a simple topological-like propagation by iterating and repeating if needed,
# or better, use the structure if possible. Kuhn is small, we can just use the known structure.
# Realization plan for Player 0 (x) is in primal_solutions[1]
# Realization plan for Player 1 (y) is in primal_solutions[0]
def get_realization_prob(player, isa_key):
if isa_key in _EMPTY_INFOSET_ACTION_KEYS:
return 1.0
if player == 0:
vid = lps[1].get_var_id(isa_key)
return primal_solutions[1][vid]
else:
vid = lps[0].get_var_id(isa_key)
return primal_solutions[0][vid]

for i in range(2):
for info_state in infoset_action_maps[i]:
total_weight = 0
num_actions = 0
# Reach probability of this infostate for player i is x(parent_isa_of_i).
# But we also need the reach probability of the OPPONENT'S sequence leading here.
opponent_isa_key = infostate_parent_sequences[i][info_state]

# Joint reach prob = own_reach * opponent_reach * chance_reach.
# However, total_weight across actions already includes opponent reach and chance reach
# from the objective/constraints!
# Actually, the sequence-form realization plan x(s,a) for Player 0 ALREADY incorporates
# player 0's own reach. It does NOT incorporate Player 1's decisions.

# So an infostate is reachable for player i if:
# 1. Player i's own parent sequence has non-zero realization probability.
# 2. Player 1-i's parent sequence leading here has non-zero realization probability.

# We just need to check if total_weight > 0 where total_weight is calculated
# from the OPPONENT'S realization plans for these sequences.
# Wait, no. The behavioral policy is x(s,a) / x(s).
# If x(s) == 0, the state is unreachable by player i's own strategy.
# If the state is unreachable by player 1-i's strategy, x(s) might still be > 0!

# Total realization probability of this infostate s for player i:
# x(s) = sum_a x(s,a). This x(s) is in primal_solutions[1] (for P0) or [0] (for P1).

own_weight = 0
for isa_key in infoset_action_maps[i][info_state]:
total_weight += solutions[1 - i][lps[1 - i].get_var_id(isa_key)]
num_actions += 1
unif_pr = 1.0 / num_actions
own_weight += get_realization_prob(i, isa_key)

# Opponent realization probability leading to this infostate:
opponent_weight = get_realization_prob(1 - i, opponent_isa_key)

total_reach = own_weight * opponent_weight

state_policy = policies[i].policy_for_key(info_state)
for isa_key in infoset_action_maps[i][info_state]:
# The 1 - i here is due to Eq (8) yielding a solution for player 1 and
# Eq (9) a solution for player 0.
rel_weight = solutions[1 - i][lps[1 - i].get_var_id(isa_key)]
_, action_str = isa_key.split(_DELIMITER)
action = int(action_str)
pr_action = rel_weight / total_weight if total_weight > 0 else unif_pr
state_policy[action] = pr_action
return (solutions[0][lps[0].get_var_id(_EMPTY_INFOSET_KEYS[0])],
solutions[1][lps[1].get_var_id(_EMPTY_INFOSET_KEYS[1])], policies[0],
policies[1])
if total_reach > 1e-8:
for isa_key in infoset_action_maps[i][info_state]:
rel_weight = get_realization_prob(i, isa_key)
_, action_str = isa_key.split(_DELIMITER)
action = int(action_str)
state_policy[action] = rel_weight / own_weight if own_weight > 0 else 1.0/len(infoset_action_maps[i][info_state])
else:
# State is unreachable in equilibrium. Use subgame-perfect optimal actions.
optimal_actions = []
for isa_key in infoset_action_maps[i][info_state]:
slack = lps[i].get_slack(isa_key, primal_solutions[i])
if abs(slack) < 1e-7:
_, action_str = isa_key.split(_DELIMITER)
optimal_actions.append(int(action_str))

state_policy.fill(0.0)
if not optimal_actions:
prob = 1.0 / len(infoset_action_maps[i][info_state])
for isa_key in infoset_action_maps[i][info_state]:
_, action_str = isa_key.split(_DELIMITER)
state_policy[int(action_str)] = prob
else:
prob = 1.0 / len(optimal_actions)
for action in optimal_actions:
state_policy[action] = prob

return (primal_solutions[0][lps[0].get_var_id(_EMPTY_INFOSET_KEYS[0])],
primal_solutions[1][lps[1].get_var_id(_EMPTY_INFOSET_KEYS[1])],
policies[0], policies[1])
Loading