首先介绍一下今天学习的强化学习内容中会用到的环境,悬崖边上走
就是一个R*C的二维数组,最后一行第一列和最后一列分别是起点和终点
agent每走一步都会有-1的reward,如果走到悬崖边上就会-100的reward

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class CliffWalk:
"""这一环境的悬崖固定在最后一行,其中最后一行第一列是起始位置,最后一列是终止位置,其它位置都是悬崖"""
def __init__(self, n_rows, n_cols):
self.n_rows = n_rows
self.n_cols = n_cols
self.n_state = self.n_rows * self.n_cols
# P[state][action] = [(P, next_state, reward, done)]
self.P = self.init_P()

def init_P(self):
P = [[[]for _ in range(4)] for _ in range(self.n_rows * self.n_cols)]
action = [[0, -1], [0, 1], [-1, 0], [1, 0]]
for i in range(self.n_rows):
for j in range(self.n_cols):
cur_state = i * self.n_cols + j
for a in range(4):
"""如果在悬崖或者终点奖励为0"""
if i == self.n_rows - 1 and j > 0:
P[cur_state][a] = [(1, cur_state, 0, True)]
continue
next_x = min(self.n_cols-1, max(0, j + action[a][0]))
next_y = min(self.n_rows-1, max(0, i + action[a][1]))
next_state = next_y * self.n_cols + next_x
reward = -1
done = False
if next_y == self.n_rows - 1 and next_x > 0:
done = True
if next_x != self.n_cols - 1:
reward = -100
P[cur_state][a] = [(1, next_state, reward, done)]
return P

动态规划

策略迭代

主要包含有两个主要过程,一个是策略评估,一个是策略提升
算法过程在代码注释中会有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""
这个策略迭代算法包含了两个部分,
策略评估:
该部分的重点在于,更新价值状态,即V(s),使用策略边缘化动作也就是pi[s][a] * Q[s][a]来计算V(s)
Q[s][a]使用p*(reward + gamma * V(s') * (1 - done))来计算,(1-done)用来在终止状态去掉V(s')
这部分最终获得的就是V(s)
策略提升:
该部分重点在于,更新策略,即pi[s][a],
通过评估完成的V(s)来计算pi[s][a],pi[s][a] = argmaxQ[s][a] arg a∈A
策略迭代算法首先进行策略评估让V收敛,然后在收敛的V上进行策略提升获得一个新的pi
然后下一次迭代使用新的pi进行策略评估获得一个收敛的V,然后再在新的V上获得一个新的pi,如此循环
"""
class PolicyIteration:
def __init__(self, env, theta, gamma):
self.env = env
self.n_state = env.n_state if hasattr(env, 'n_state') else env.ncol*env.nrow
self.pi = [[0.25, 0.25, 0.25, 0.25] for _ in range(self.n_state)]
self.v = [0] * self.n_state
self.theta = theta
self.gamma = gamma

def _get_qsalist(self, s, is_imporvement):
Q_s_a_list = []
for a in range(4):
Q_s_a = 0
"""
采取一个动作后下一个状态可能存在多个,虽然在Cliff里面下一个状态是确定的(也就是采取一个动作后只有一个下一状态),
但是这个四元组的P存储的是这一动作到达下一状态的概率
"""
for action_state in self.env.P[s][a]:
prob, next_state, reward, done = action_state
Q_s_a += prob * (reward + self.gamma * self.v[next_state] * (1 - done))
if is_imporvement:
"""在策略提升过程中,我们需要获得最好的动作价值,因此不能将动作边缘化"""
Q_s_a_list.append(Q_s_a)
else:
"""在策略评估过程中,我们需要边缘化动作而获得某一状态下的价值,因此需要将策略与动作价值函数相乘以活动状态价值"""
Q_s_a_list.append(self.pi[s][a]*Q_s_a)
return Q_s_a_list

def policy_evaluation(self):
"""更新所有状态的V值"""
iteration_times = 1
while 1:
max_diff = 0
new_v = [0] * self.n_state
for s in range(self.n_state):
Q_s_a_list = self._get_qsalist(s, is_imporvement=False)
new_v[s] = sum(Q_s_a_list) # 这里使用sum是因为Q_s_a_list中已经边缘化了动作,sum相当于对Qsa加权平均了
max_diff = max(abs(new_v[s] - self.v[s]), max_diff)
self.v = new_v
iteration_times += 1
if max_diff < self.theta: break # 满足收敛条件,退出评估迭代
iteration_times += 1
print(f"policy evaluation finished iterations at {iteration_times} times")

def policy_improvement(self):
for s in range(self.n_state):
Q_s_a_list = self._get_qsalist(s, is_imporvement=True)
max_Q = max(Q_s_a_list)
cnt = Q_s_a_list.count(max_Q)
self.pi[s] = [1/cnt if q == max_Q else 0 for q in Q_s_a_list]
print(f"policy improvment finished")
return self.pi

def policy_iteration(self):
while 1:
self.policy_evaluation()
old_pi = copy.deepcopy(self.pi)
new_pi = self.policy_improvement()
if old_pi == new_pi:break

就是说这个算法迭代实在太久了,主要过程就在策略评估,需要好几个迭代次数才能让改变值小于阈值
所以就想到如果不让状态价值完全收敛就进行策略提升行不行,然后就是价值迭代

价值迭代

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""
与策略提升算法相比,价值提升的迭代次数更少,使用的方法也更加直接
在策略评估的过程中没有使用Qsa的加权平均,而是直接取最大的Qsa作为Vs的值,这样也就不需要状态价值完全收敛了
在策略提升过程中就是使用上一步得到的Vs更新策略
"""
class ValueIteration:
def __init__(self, env, theta, gamma):
self.env = env
self.n_state = env.n_state if hasattr(env, 'n_state') else env.ncol*env.nrow
self.pi = [[0.25, 0.25, 0.25, 0.25] for _ in range(self.n_state)]
self.v = [0] * self.n_state
self.theta = theta
self.gamma = gamma

def _get_qsalist(self, s):
Q_s_a_list = []
for a in range(4):
Q_s_a = 0
"""
采取一个动作后下一个状态可能存在多个,虽然在Cliff里面下一个状态是确定的(也就是采取一个动作后只有一个下一状态),
但是这个四元组的P存储的是这一动作到达下一状态的概率
"""
for action_state in self.env.P[s][a]:
prob, next_state, reward, done = action_state
Q_s_a += prob * (reward + self.gamma * self.v[next_state] * (1 - done))
Q_s_a_list.append(Q_s_a)
return Q_s_a_list

def value_iteration(self):
iter_times = 0
while 1:
max_diff = 0
new_v = [0] * self.n_state
for s in range(self.n_state):
Q_s_a_list = self._get_qsalist(s)
new_v[s] = max(Q_s_a_list) # diff from policy iteration
max_diff = max(abs(new_v[s] - self.v[s]), max_diff)
self.v = new_v
iter_times += 1
if max_diff < self.theta: break # 满足收敛条件,退出评估迭代
print(f"iteration finished at {iter_times} times")
self.policy_improve()

def policy_improve(self):
for s in range(self.n_state):
Q_s_a_list = self._get_qsalist(s)
max_Q = max(Q_s_a_list)
cnt = Q_s_a_list.count(max_Q)
self.pi[s] = [1/cnt if q == max_Q else 0 for q in Q_s_a_list]
print(f"policy improvment finished")

时序差分

动态规划需要对环境完全了解才能进行,这个在很多问题中几乎是不可能的,所以为了让智能体在与环境交互中进行学习就有了时序差分算法
我感觉这个时序差分其实就是一次迭代利用上一次迭代更新的值才叫这个名字的,因为动态规划的策略提升其实用的是本次迭代获得的状态价值来进行的,而时序差分不是
为了让环境未知,环境类需要做一点改变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class CliffWalking:
def __init__(self, n_cols, n_rows):
self.n_cols = n_cols
self.n_rows = n_rows
self.n_state = n_cols * n_rows
self.agent_x = 0
self.agent_y = n_rows - 1

def step(self, action):
actions = [[0, -1],[0, 1],[-1, 0],[1, 0]]
self.agent_x = min(self.n_cols-1, max(0, self.agent_x + actions[action][0]))
self.agent_y = min(self.n_rows-1, max(0, self.agent_y + actions[action][1]))
next_state = self.agent_y * self.n_cols + self.agent_x
reward = -1
done = False
if self.agent_y == self.n_rows - 1 and self.agent_x > 0:
done = True
if self.agent_x != self.n_cols - 1:
reward = -100
return next_state, reward, done

def reset(self):
self.agent_x = 0
self.agent_y = self.n_rows - 1
return self.agent_x + self.agent_y*self.n_cols

Sarsa

更新公式相当简单,
在代码中有,这个算法是状态动作和下一状态下一动作对本次迭代过程中的Q值进行更新的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Sarsa:
def __init__(self, n_states, n_actions, alpha, gamma, epsilon):
self.Q_table = np.zeros([n_states, n_actions])
self.n_actions = n_actions
self.alpa = alpha
self.gamma = gamma
self.epsilon = epsilon

"""在悬崖边上也会有概率走向悬崖"""
def take_action(self, state):
if np.random.random() < self.epsilon:
action = np.random.randint(self.n_actions)
# print(f"random action: {action}")
else:
# print(f"Q_label[state] shape= {self.Q_table[state].shape}, state = {state}")
action = np.argmax(self.Q_table[state])
# print(f"specific action: {action}")
return action

def best_action(self, state):
"""返回当前状态下所有Q为最大的动作"""
a = [0 for i in range(self.n_actions)]
Q_max = np.max(self.Q_table[state])
for i in range(self.n_actions):
if self.Q_table[state][i] == Q_max:
a[i] = 1
return a


def update(self, cur_state, cur_action, reward, next_state, next_action):
self.Q_table[cur_state, cur_action] += self.alpa*(reward + self.gamma * self.Q_table[next_state, next_action] - self.Q_table[cur_state, cur_action])

NStepSarsa

引入了一个buffer存储历史状态动作和反馈,在积攒n步之后对buffer的第一个状态动作对Q值进行更新,如果序列中碰到了终止状态就直接对n步中的每一步的动作状态对Q进行更新,这是为了让算法能够将到达终点这一事件能归因到与其有关的状态动作对上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class NStepSarsa(Sarsa):
def __init__(self, n, n_states, n_actions, alpha, gamma, epsilon):
super().__init__(n_states, n_actions, alpha, gamma, epsilon)
self.actions_buffer = []
self.state_buffer = []
self.reward_buffer = []
self.n = n

"""
算法确实在N步序列结束时更新了序列开始的状态-动作对的Q值。如果在这N步中的任意步骤到达了终止状态,
则除了更新序列开始的状态-动作对之外,还会更新到达终止状态之前的所有状态-动作对的Q值,以反映它们对终止结果的贡献。
"""
def update(self, cur_state, cur_action, reward, next_state, next_action, done):
self.actions_buffer.append(cur_action)
self.state_buffer.append(cur_state)
self.reward_buffer.append(reward)
if len(self.actions_buffer) == self.n:
G = self.Q_table[next_state, next_action]
for i in reversed(range(self.n)):
G = self.reward_buffer[i] + self.gamma * G
"""计算与最终状态直接相关的动作状态对并对其进行更新"""
if done and i > 0:
s = self.state_buffer[i]
a = self.actions_buffer[i]
self.Q_table[s, a] += self.alpa * (G - self.Q_table[s, a])
"""在每次更新后,移除缓冲区中最旧的状态、动作和奖励,为接下来的N步更新腾出空间。"""
s = self.state_buffer.pop(0)
a = self.actions_buffer.pop(0)
self.reward_buffer.pop(0)
self.Q_table[s, a] += self.alpa * (G - self.Q_table[s, a])
if done:
self.actions_buffer = []
self.state_buffer = []
self.reward_buffer = []

QLearning

这里的更新方式就更加简单了,甚至不对下一步的状态动作进行采样,而是使用上一次迭代的最大状态动作Q作为本次更新的Q值,我认为这也是利用历史信息的一种方式

1
2
3
4
5
6
class Qlearning(Sarsa):
def __init__(self, n_states, n_actions, alpha, gamma, epsilon):
super().__init__(n_states, n_actions, alpha, gamma, epsilon)

def update(self, state, action, reward, next_state):
self.Q_table[state, action] += self.alpa * (reward + self.gamma * np.max(self.Q_table[next_state]) - self.Q_table[state, action])

这里有个在线策略和离线策略记得去看看

感觉

强化学习目前学到的方法似乎是需要非常多的与环境交互的,但是在某些情况下与环境进行多次交互会非常冒险,就比如悬崖和冰窟这两个例子,现实中不可能让智能体进行多次试错,这个时候就需要对环境进行一个模拟,感觉强化学习最终应用效果的好坏(依据目前学习到的知识)几乎是完全依赖于环境模拟的好坏。

还有一个就是,强化学习需要在一个环境中不断交互进行学习,如果一个智能体依据强化学习适应了一个环境后是不是在另一个环境不进行交互学习基本上完全无法运行的。

以上都基于我已经学习的知识进行的猜想