# 使用 Actor-Critic 方法玩 CartPole 游戏

Actor-Critic 方法

Actor-Critic 方法是表示与价值函数无关的策略函数的时间差分 (TD) 学习方法。

CartPole-v0

CartPole-v0 环境中，将长杆连接到沿无摩擦轨道移动的小车上。长杆开始时是直立的，代理的目标是通过对小车施加 -1+1 的力来防止其倒下。对于长杆保持直立的每个时间步骤，奖励为 +1。当 1) 长杆与垂直方向的夹角超过 15 度或 2) 小车从中心移出超过 2.4 个单位时，片段结束。

<figure>
<image src="https://tensorflow.org/tutorials/reinforcement_learning/images/cartpole-v0.gif">
<figcaption>
Trained actor-critic model in Cartpole-v0 environment
</figcaption>
</figure>

## 安装

pip install gym[classic_control]
pip install pyglet
# Install additional packages for visualization
sudo apt-get install -y python-opengl > /dev/null 2>&1
pip install git+https://github.com/tensorflow/docs > /dev/null 2>&1
import collections
import gym
import numpy as np
import statistics
import tensorflow as tf
import tqdm

from matplotlib import pyplot as plt
from tensorflow.keras import layers
from typing import Any, List, Sequence, Tuple

# Create the environment
env = gym.make("CartPole-v1")

# Set seed for experiment reproducibility
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

# Small epsilon value for stabilizing division operations
eps = np.finfo(np.float32).eps.item()


## 模型

class ActorCritic(tf.keras.Model):
"""Combined actor-critic network."""

def __init__(
self,
num_actions: int,
num_hidden_units: int):
"""Initialize."""
super().__init__()

self.common = layers.Dense(num_hidden_units, activation="relu")
self.actor = layers.Dense(num_actions)
self.critic = layers.Dense(1)

def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
x = self.common(inputs)
return self.actor(x), self.critic(x)

num_actions = env.action_space.n  # 2
num_hidden_units = 128

model = ActorCritic(num_actions, num_hidden_units)


## 训练代理

1. 在环境上运行代理以收集每个片段的训练数据。
2. 计算每个时间步骤的预期回报。
3. 计算组合的 Actor-Critic 模型的损失。
4. 计算梯度并更新网络参数。
5. 重复第 1-4 步，直至达到成功标准或最大片段数。

### 1. 收集训练数据

# Wrap Gym's env.step call as an operation in a TensorFlow function.
# This would allow it to be included in a callable TensorFlow graph.

def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Returns state, reward and done flag given an action."""

state, reward, done, truncated, info = env.step(action)
return (state.astype(np.float32),
np.array(reward, np.int32),
np.array(done, np.int32))

def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
return tf.numpy_function(env_step, [action],
[tf.float32, tf.int32, tf.int32])

def run_episode(
initial_state: tf.Tensor,
model: tf.keras.Model,
max_steps: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
"""Runs a single episode to collect training data."""

action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

initial_state_shape = initial_state.shape
state = initial_state

for t in tf.range(max_steps):
# Convert state into a batched tensor (batch size = 1)
state = tf.expand_dims(state, 0)

# Run the model and to get action probabilities and critic value
action_logits_t, value = model(state)

# Sample next action from the action probability distribution
action = tf.random.categorical(action_logits_t, 1)[0, 0]
action_probs_t = tf.nn.softmax(action_logits_t)

# Store critic values
values = values.write(t, tf.squeeze(value))

# Store log probability of the action chosen
action_probs = action_probs.write(t, action_probs_t[0, action])

# Apply action to the environment to get next state and reward
state, reward, done = tf_env_step(action)
state.set_shape(initial_state_shape)

# Store reward
rewards = rewards.write(t, reward)

if tf.cast(done, tf.bool):
break

action_probs = action_probs.stack()
values = values.stack()
rewards = rewards.stack()

return action_probs, values, rewards


### 2. 计算预期回报

$G_{t} = \sum^{T}*{t'=t} \gamma^{t'-t}r*{t'}$

def get_expected_return(
rewards: tf.Tensor,
gamma: float,
standardize: bool = True) -> tf.Tensor:
"""Compute expected returns per timestep."""

n = tf.shape(rewards)[0]
returns = tf.TensorArray(dtype=tf.float32, size=n)

# Start from the end of rewards and accumulate reward sums
# into the returns array
rewards = tf.cast(rewards[::-1], dtype=tf.float32)
discounted_sum = tf.constant(0.0)
discounted_sum_shape = discounted_sum.shape
for i in tf.range(n):
reward = rewards[i]
discounted_sum = reward + gamma * discounted_sum
discounted_sum.set_shape(discounted_sum_shape)
returns = returns.write(i, discounted_sum)
returns = returns.stack()[::-1]

if standardize:
returns = ((returns - tf.math.reduce_mean(returns)) /
(tf.math.reduce_std(returns) + eps))

return returns


### 3. Actor-Critic 损失

$L = L_{actor} + L_{critic}$

#### 行动者损失

$L_{actor} = -\sum^{T}*{t=1} log\pi*{\theta}(a_{t} | s_{t})[G(s_{t}, a_{t}) - V^{\pi}*{\theta}(s*{t})]$

• $$T$$：每个片段的时间步骤数，各个片段可能有所不同
• $$s_{t}$$：时间步骤 $$t$$ 的状态
• $$a_{t}$$：对于给定状态 $$s$$，在时间步骤 $$t$$ 选择的动作
• $$\pi_{\theta}$$：由 $$\theta$$ 参数化的策略（行动者）
• $$V^{\pi}_{\theta}$$：由 $$\theta$$ 参数化的价值函数（评价者）
• $$G = G_{t}$$：对于给定状态，在时间步骤 $$t$$ 采取的动作对的预期回报

##### 优势

$$L_{actor}$$ 公式中的 $$G - V$$ 项称为优势，它表示针对给定的特定状态采取的动作要比根据用于该状态的策略 $$\pi$$ 选择的随机动作好得多。

#### The Critic loss

$L_{critic} = L_{\delta}(G, V^{\pi}_{\theta})$

huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

def compute_loss(
action_probs: tf.Tensor,
values: tf.Tensor,
returns: tf.Tensor) -> tf.Tensor:
"""Computes the combined Actor-Critic loss."""

action_log_probs = tf.math.log(action_probs)

critic_loss = huber_loss(values, returns)

return actor_loss + critic_loss


### 4. 定义训练步骤以更新参数

tf.function 上下文应用于 train_step 函数，这样便可将其编译为可调用的 TensorFlow 计算图，进而可以将训练速度提高 10 倍。

optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

@tf.function
def train_step(
initial_state: tf.Tensor,
model: tf.keras.Model,
optimizer: tf.keras.optimizers.Optimizer,
gamma: float,
max_steps_per_episode: int) -> tf.Tensor:
"""Runs a model training step."""

# Run the model for one episode to collect training data
action_probs, values, rewards = run_episode(
initial_state, model, max_steps_per_episode)

# Calculate the expected returns
returns = get_expected_return(rewards, gamma)

# Convert training data to appropriate TF tensor shapes
action_probs, values, returns = [
tf.expand_dims(x, 1) for x in [action_probs, values, returns]]

# Calculate the loss values to update our network
loss = compute_loss(action_probs, values, returns)

# Compute the gradients from the loss

# Apply the gradients to the model's parameters

episode_reward = tf.math.reduce_sum(rewards)

return episode_reward


### 5. 运行训练循环

%%time

min_episodes_criterion = 100
max_episodes = 10000
max_steps_per_episode = 500

# CartPole-v1 is considered solved if average reward is >= 475 over 500
# consecutive trials
reward_threshold = 475
running_reward = 0

# The discount factor for future rewards
gamma = 0.99

# Keep the last episodes reward
episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)

t = tqdm.trange(max_episodes)
for i in t:
initial_state, info = env.reset()
initial_state = tf.constant(initial_state, dtype=tf.float32)
episode_reward = int(train_step(
initial_state, model, optimizer, gamma, max_steps_per_episode))

episodes_reward.append(episode_reward)
running_reward = statistics.mean(episodes_reward)

t.set_postfix(
episode_reward=episode_reward, running_reward=running_reward)

# Show the average episode reward every 10 episodes
if i % 10 == 0:
pass # print(f'Episode {i}: average reward: {avg_reward}')

if running_reward > reward_threshold and i >= min_episodes_criterion:
break

print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')


## 可视化

# Render an episode and save as a GIF file

from IPython import display as ipythondisplay
from PIL import Image

render_env = gym.make("CartPole-v1", render_mode='rgb_array')

def render_episode(env: gym.Env, model: tf.keras.Model, max_steps: int):
state, info = render_env.reset()
state = tf.constant(state, dtype=tf.float32)
screen = render_env.render()
images = [Image.fromarray(screen)]

for i in range(1, max_steps + 1):
state = tf.expand_dims(state, 0)
action_probs, _ = model(state)
action = np.argmax(np.squeeze(action_probs))

state, reward, done, truncated, info = render_env.step(action)
state = tf.constant(state, dtype=tf.float32)

# Render screen every 10 steps
if i % 10 == 0:
screen = render_env.render()
images.append(Image.fromarray(screen))

if done:
break

return images

# Save GIF image
images = render_episode(env, model, max_steps_per_episode)
image_file = 'cartpole-v1.gif'
# loop=0: loop forever, duration=1: play each frame for 1ms
images[0].save(
image_file, save_all=True, append_images=images[1:], loop=0, duration=1)

import tensorflow_docs.vis.embed as embed
embed.embed_file(image_file)


## 后续步骤

[]
[]
{"lastModified": "\u6700\u540e\u66f4\u65b0\u65f6\u95f4 (UTC)\uff1a2024-01-11\u3002"}