-
Notifications
You must be signed in to change notification settings - Fork 1
/
maze.py
121 lines (97 loc) · 4.23 KB
/
maze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import numpy as np
from pathlib import Path
from enum import Enum
from el2805.envs.grid_world import GridWorld, Move, Position
class MazeCell(Enum):
EMPTY = "0"
WALL = "#"
START = "A"
EXIT = "B"
DELAY_R1 = "R1"
DELAY_R2 = "R2"
@property
def delay(self) -> int:
if self is MazeCell.DELAY_R1:
d = 6
elif self is MazeCell.DELAY_R2:
d = 1
else:
d = 0
return d
def __str__(self):
return self.value
class Maze(GridWorld):
_reward_step = -1
_reward_exit = -_reward_step
_probability_delay = 0.5
def __init__(self, map_filepath: Path, horizon: int | None = None):
super().__init__(map_filepath, horizon)
self._states = [
(x, y) for x in range(self.map.shape[0]) for y in range(self.map.shape[1])
if self.map[x, y] is not MazeCell.WALL
]
self._state_to_index = {state: s for state, s in zip(self._states, np.arange(len(self._states)))}
def step(self, action: int) -> tuple[Position, float, bool, dict]:
state, reward, done, info = super().step(action)
won = self._won()
assert not (won and not done)
info["won"] = won
return state, reward, done, info
def reward(self, state: Position, action: int, mean: bool = False) -> float:
assert action in self.valid_actions(state)
# terminal state (absorbing): nothing happens
if self.terminal_state(state):
reward = 0
# main objective: minimize the time to exit <=> maximize the negative time to exit
# => negative reward (penalty) at each step
else:
delay = self.map[state].delay
reward_no_delay = self._reward_step
reward_delay = (1 + delay) * self._reward_step
# exit!
# Pay attention: the reward when the exit is reached must be greater than the another walk step.
# Otherwise, with T corresponding to the shortest path length, the agent is not encouraged to exit the maze.
# Indeed, the total reward of exiting the maze (without staying there for at least 1 timestep) would be
# equal to the reward of not exiting the maze.
next_state = self._next_state(state, action)
if self.terminal_state(next_state):
reward_no_delay += self._reward_exit
reward_delay += self._reward_exit
if mean:
reward = self._probability_delay * reward_delay + (1 - self._probability_delay) * reward_no_delay
else:
reward = self._rng.choice(
a=[reward_delay, reward_no_delay],
p=[self._probability_delay, 1 - self._probability_delay]
)
return reward
def valid_actions(self, state: Position) -> list[Move]:
valid_moves = [Move.NOP]
if not self.terminal_state(state):
x, y = state
x_tmp = x - 1
if x_tmp >= 0 and self.map[x_tmp, y] is not MazeCell.WALL:
valid_moves.append(Move.UP)
x_tmp = x + 1
if x_tmp < self.map.shape[0] and self.map[x_tmp, y] is not MazeCell.WALL:
valid_moves.append(Move.DOWN)
y_tmp = y - 1
if y_tmp >= 0 and self.map[x, y_tmp] is not MazeCell.WALL:
valid_moves.append(Move.LEFT)
y_tmp = y + 1
if y_tmp < self.map.shape[1] and self.map[x, y_tmp] is not MazeCell.WALL:
valid_moves.append(Move.RIGHT)
return valid_moves
def state_index(self, state: Position) -> int:
return self._state_to_index[state]
def _won(self):
return self.terminal_state(self._current_state)
def terminal_state(self, state: Position) -> bool:
exited = self.map[state] is MazeCell.EXIT
return exited
def _load_map(self, filepath: Path) -> None:
with open(filepath) as f:
lines = f.readlines()
self.map = np.asarray([[MazeCell(symbol) for symbol in line[:-1].split("\t")] for line in lines])
self._initial_state = np.asarray(self.map == MazeCell.START).nonzero()
self._initial_state = (int(self._initial_state[0][0]), int(self._initial_state[1][0]))