-
Notifications
You must be signed in to change notification settings - Fork 1
/
minotaur_maze.py
343 lines (286 loc) · 14.5 KB
/
minotaur_maze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import gym
import numpy as np
import itertools as it
from pathlib import Path
from enum import Enum, IntEnum
from termcolor import colored
from el2805.envs.maze import Maze, MazeCell
from el2805.envs.grid_world import Move, Position
from el2805.utils import decide_random
class Progress(IntEnum):
EATEN = 0 # eaten
WITHOUT_KEYS = 1 # keys not collected yet
WITH_KEYS = 2 # keys collected (or not present in the maze)
EXITED = 3 # exited alive
class MinotaurMazeCell(Enum):
KEY = "C"
def __str__(self):
return self.value
# (player position, minotaur position, progress)
State = tuple[Position, Position, Progress]
class MinotaurMaze(Maze):
_reward_key = 1
_reward_exit = 1
_probability_chase_move = 0.35
_sentinel_position = (-1, -1)
def __init__(
self,
*,
map_filepath: Path,
horizon: int | None = None,
minotaur_nop: bool = False,
probability_poison_death: float = 0,
minotaur_chase: bool = False,
keys: bool = False
):
self.keys = keys
super().__init__(map_filepath, horizon)
self.minotaur_nop = minotaur_nop
self.probability_poison_death = probability_poison_death
self.minotaur_chase = minotaur_chase
assert not (self.probability_poison_death > 0 and self.finite_horizon()) # poison only for discounted MDPs
self.observation_space = gym.spaces.Tuple((
gym.spaces.MultiDiscrete(self.map.shape), # player
gym.spaces.MultiDiscrete(self.map.shape), # minotaur
gym.spaces.Discrete(n=len(Progress)) # progress (key not collected, key collected, exited, eaten)
))
if not self.minotaur_chase:
self._probability_chase_move = 0
if self.finite_horizon():
# important: since this is an additional objective, the worst-case penalty should be much lower than the
# exit reward. Otherwise, the player might prioritize minimizing the average time to exit, resulting in a
# lower probability of exiting alive.
self._reward_step = - 0.0001
else:
# for discounted MDPs, we do not need this reward
# indeed, the exit reward is discounted, so the player will not waste time
self._reward_step = 0
self._states = self._generate_state_space()
self._state_to_index = {state: s for state, s in zip(self._states, np.arange(len(self._states)))}
def reward(self, state: State, action: Move, mean: bool = False) -> float:
assert action in self.valid_actions(state)
if mean:
next_states, transition_probabilities = self.next_states(state, action)
rewards = np.asarray([self._reward(state, next_state) for next_state in next_states])
reward = transition_probabilities.dot(rewards) # mean reward
else:
next_state = self._next_state(state, action)
reward = self._reward(state, next_state)
return reward
def _reward(self, state: State, next_state: State) -> float:
_, _, progress = state
_, _, next_progress = next_state
# terminal state (absorbing): nothing happens
if self.terminal_state(state):
reward = 0
# main objective: maximize probability of exiting alive before the time expires
# <=> maximize reward by collecting the keys and exit reward
# => positive reward for collecting keys and exiting alive
elif next_progress is not Progress.EATEN and \
progress is Progress.WITHOUT_KEYS and next_progress is Progress.WITH_KEYS:
reward = self._reward_key
elif next_progress is not Progress.EATEN and \
progress is Progress.WITH_KEYS and next_progress is Progress.EXITED:
reward = self._reward_exit
# additional objective: don't waste time while you are alive
# <=> minimize time to exit <=> maximize negative time to exit
# notice that for discounted MDPs, the step penalty is set to 0 in the constructor
else:
reward = self._reward_step
return reward
def next_states(self, state: State, action: int) -> tuple[list[State], np.ndarray]:
if self.terminal_state(state):
next_states = [state]
transition_probabilities = np.asarray([1])
else:
transition_probabilities = {}
# example: P('up') =
# = P('random move', 'up') + P('deterministic move', 'up')
# = P('random move') * P('up') + P('deterministic move') * P('up')
# here we add the first part...
random_moves = self._valid_minotaur_moves(state, chase=False)
probability_random_move = (1 - self._probability_chase_move) * 1 / len(random_moves)
for minotaur_move in random_moves:
next_state = self._next_state(state, action, minotaur_move)
if next_state not in transition_probabilities:
transition_probabilities[next_state] = 0
transition_probabilities[next_state] += probability_random_move
if self.minotaur_chase:
# ...and here the second part
chase_moves = self._valid_minotaur_moves(state, chase=True)
probability_chase_move = self._probability_chase_move * 1 / len(chase_moves)
for minotaur_move in chase_moves:
next_state = self._next_state(state, action, minotaur_move)
transition_probabilities[next_state] += probability_chase_move
next_states = list(transition_probabilities.keys())
transition_probabilities = np.asarray(list(transition_probabilities.values()))
return next_states, transition_probabilities
def _next_state(self, state: State, action: int, minotaur_move: Move | None = None) -> State:
player_position, minotaur_position, progress = state
action = Move(action)
if action not in self.valid_actions(state):
raise ValueError(f"Invalid action {action}")
if self.terminal_state(state):
pass # state stays the same (absorbing state)
else:
if minotaur_move is None:
chase = self.minotaur_chase and decide_random(self._rng, self._probability_chase_move)
valid_minotaur_moves = self._valid_minotaur_moves(state, chase=chase)
minotaur_move = self._rng.choice(valid_minotaur_moves)
next_player_position = super()._next_state(player_position, action)
next_minotaur_position = super()._next_state(minotaur_position, minotaur_move)
if next_player_position == next_minotaur_position:
state = (self._sentinel_position, self._sentinel_position, Progress.EATEN)
elif progress is Progress.WITH_KEYS and self.map[next_player_position] is MazeCell.EXIT:
state = (self._sentinel_position, self._sentinel_position, Progress.EXITED)
elif progress is Progress.WITHOUT_KEYS and self.map[next_player_position] is MinotaurMazeCell.KEY:
state = (next_player_position, next_minotaur_position, Progress.WITH_KEYS)
else:
state = (next_player_position, next_minotaur_position, progress)
return state
def valid_actions(self, state: State | Position) -> list[Move]:
if self.terminal_state(state):
valid_moves = [Move.NOP]
else:
if isinstance(state, tuple) and isinstance(state[0], int):
player_position = state # called by parent class with only player position
else:
player_position, _, _ = state
valid_moves = super().valid_actions(player_position)
return valid_moves
def _valid_minotaur_moves(self, state: State, chase: bool) -> list[Move]:
valid_moves = self._chase_minotaur_moves(state) if chase else self._random_minotaur_moves(state)
return valid_moves
def _random_minotaur_moves(self, state: State) -> list[Move]:
random_moves = []
if self.terminal_state(state):
random_moves.append(Move.NOP)
else:
_, minotaur_position, _ = state
x_minotaur, y_minotaur = minotaur_position
if self.minotaur_nop:
random_moves.append(Move.NOP)
if x_minotaur - 1 >= 0:
random_moves.append(Move.UP)
if x_minotaur + 1 < self.map.shape[0]:
random_moves.append(Move.DOWN)
if y_minotaur - 1 >= 0:
random_moves.append(Move.LEFT)
if y_minotaur + 1 < self.map.shape[1]:
random_moves.append(Move.RIGHT)
return random_moves
def _chase_minotaur_moves(self, state: State) -> list[Move]:
chase_moves = []
if self.terminal_state(state):
chase_moves.append(Move.NOP)
else:
player_position, minotaur_position, _ = state
x_player, y_player = player_position
x_minotaur, y_minotaur = minotaur_position
delta_x = x_player - x_minotaur
delta_y = y_player - y_minotaur
assert abs(delta_x) > 0 or abs(delta_y) > 0 # otherwise it should be eaten (terminal state)
# move towards the player along the direction with smallest absolute delta
# if the smallest absolute delta is 0 (aligned along that direction), move along the other direction
# notice that there can be two moves resulting in the same distance
if delta_x != 0 and (delta_y == 0 or abs(delta_x) <= abs(delta_y)):
if delta_x < 0:
chase_moves.append(Move.UP)
else:
chase_moves.append(Move.DOWN)
if delta_y != 0 and (delta_x == 0 or abs(delta_y) <= abs(delta_x)):
if delta_y < 0:
chase_moves.append(Move.LEFT)
else:
chase_moves.append(Move.RIGHT)
return chase_moves
def _horizon_reached(self) -> bool:
# random time horizon geometrically distributed
if self.probability_poison_death > 0:
horizon_reached = decide_random(self._rng, self.probability_poison_death)
else:
horizon_reached = super()._horizon_reached()
return horizon_reached
def terminal_state(self, state: State | Position) -> bool:
if isinstance(state, tuple) and isinstance(state[0], int): # called by parent class
terminal = False
else:
_, _, progress = state
terminal = progress is Progress.EATEN or progress is Progress.EXITED
return terminal
def _generate_state_space(self) -> list[State]:
# minotaur anywhere
minotaur_states = [(x, y) for x in range(self.map.shape[0]) for y in range(self.map.shape[1])]
# player not in walls
player_states = [(x, y) for x, y in minotaur_states if self.map[x, y] is not MazeCell.WALL]
# key collected or not
keys_collected = [Progress.WITHOUT_KEYS, Progress.WITH_KEYS] if self.keys else [Progress.WITH_KEYS]
# Cartesian product
states = list(it.product(player_states, minotaur_states, keys_collected))
# collapse terminal states to just one exit state and one eaten state
def non_terminal_state(state):
player_position, minotaur_position, progress = state
eaten = player_position == minotaur_position
exited = progress is Progress.WITH_KEYS and self.map[player_position] is MazeCell.EXIT
return not eaten and not exited
states = [state for state in states if non_terminal_state(state)]
states.append((self._sentinel_position, self._sentinel_position, Progress.EATEN))
states.append((self._sentinel_position, self._sentinel_position, Progress.EXITED))
return states
def _won(self):
_, _, progress = self._current_state
return progress is Progress.EXITED
def render(self, mode: str = "human", policy: np.ndarray = None) -> None:
assert mode == "human" or (mode == "policy" and policy is not None and policy.shape == self.map.shape)
map_ = self.map.copy()
if mode == "human":
player_position, minotaur_position, progress = self._current_state
if progress is Progress.EATEN:
print("LOSER...")
elif progress is Progress.EXITED:
print("WINNER!")
else:
if progress is Progress.WITHOUT_KEYS:
player_color = "red"
elif progress is Progress.WITH_KEYS:
player_color = "green"
else:
player_color = None
map_[player_position] = colored("P", color=player_color)
map_[minotaur_position] = colored("M", color="magenta")
self._render(map_)
elif mode == "policy":
for i in range(map_.shape[0]):
for j in range(map_.shape[1]):
if self.map[i, j] is not MazeCell.WALL:
map_[i, j] = Move(policy[i, j])
self._render(map_)
else:
raise ValueError
def _load_map(self, filepath: Path) -> None:
with open(filepath) as f:
lines = f.readlines()
# create map
self.map = np.asarray([[
(MinotaurMazeCell(symbol) if symbol == MinotaurMazeCell.KEY.value else MazeCell(symbol))
for symbol in line[:-1].split("\t")
] for line in lines])
# get starting position of player and minotaur
player_start = np.asarray(self.map == MazeCell.START).nonzero()
player_start = (int(player_start[0][0]), int(player_start[1][0]))
minotaur_start = np.asarray(self.map == MazeCell.EXIT).nonzero()
minotaur_start = (int(minotaur_start[0][0]), int(minotaur_start[1][0]))
# if there are no keys to collect in the map, start with keys
keys_present = len(np.asarray(self.map == MinotaurMazeCell.KEY).nonzero()) > 0
if self.keys:
assert keys_present
progress = Progress.WITHOUT_KEYS
else:
progress = Progress.WITH_KEYS
self._initial_state = (player_start, minotaur_start, progress)
# need to override just to avoid warning of type hints
@property
def states(self) -> list[State]:
return self._states
def state_index(self, state: State) -> int:
return self._state_to_index[state]